1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging, os, re 6 7from autotest_lib.client.bin import utils 8from autotest_lib.client.common_lib import error 9 10class KernelTrace(object): 11 """Allows access and control to Kernel tracing facilities. 12 13 Example code snippet: 14 trace = KernelTrace(events=['mali_dvfs:mali_dvfs_set_clock']) 15 results = trace.read(regexp=r'frequency=(\d+)') 16 17 Public methods: 18 on : Enables tracing 19 off : Disables tracing 20 is_tracing : Returns Boolean of tracing status. 21 event_on : Turns event on. Returns boolean of success 22 event_off : Turns event off. Returns boolean of success 23 flush : Flushes trace buffer 24 read : Reads trace buffer returns list of 25 - tuples if regexp provided 26 - else matching string 27 uptime_secs : Returns float of current uptime. 28 29 Private functions: 30 _onoff : Disable/enable tracing 31 _onoff_event : Disable/enable events 32 33 Private attributes: 34 _buffer : list to hold parsed results from trace buffer 35 _buffer_ptr : integer pointing to last byte read 36 37 TODO(tbroch): List of potential enhancements 38 - currently only supports trace events. Add other tracers. 39 """ 40 _TRACE_ROOT = '/sys/kernel/debug/tracing' 41 _TRACE_ON_PATH = os.path.join(_TRACE_ROOT, 'tracing_on') 42 43 def __init__(self, flush=True, events=None, on=True): 44 """Constructor for KernelTrace class""" 45 self._buffer = [] 46 self._buffer_ptr = 0 47 self._events = [] 48 self._on = on 49 50 if flush: 51 self.flush() 52 for event in events: 53 if self.event_on(event): 54 self._events.append(event) 55 if on: 56 self.on() 57 58 59 def __del__(self, flush=True, events=None, on=True): 60 """Deconstructor for KernelTrace class""" 61 for event in self._events: 62 self.event_off(event) 63 if self._on: 64 self.off() 65 66 67 def _onoff(self, val): 68 """Turn tracing on or off. 69 70 Arguments: 71 val: integer, 1 for on, 0 for off 72 73 Raises: 74 error.TestFail: If unable to turn tracing on or off. 75 """ 76 utils.write_one_line(self._TRACE_ON_PATH, val) 77 result = int(utils.read_one_line(self._TRACE_ON_PATH).strip()) 78 if not result == val: 79 raise error.TestFail("Unable to %sable tracing" % 80 'en' if val == 1 else 'dis') 81 82 83 def on(self): 84 """Enable tracing.""" 85 return self._onoff(1) 86 87 88 def off(self): 89 """Disable tracing.""" 90 self._onoff(0) 91 92 93 def is_tracing(self): 94 """Is tracing on? 95 96 Returns: 97 True if tracing enabled and at least one event is enabled. 98 """ 99 result = int(utils.read_one_line(self._TRACE_ON_PATH).strip()) 100 if result == 1 and len(self._events) > 0: 101 return True 102 return False 103 104 105 def _event_onoff(self, event, val): 106 """Enable/Disable tracing event. 107 108 TODO(tbroch) Consider allowing wild card enabling of trace events via 109 /sys/kernel/debug/tracing/set_event although it makes filling buffer 110 really easy 111 112 Arguments: 113 event: list of events. 114 See kernel(Documentation/trace/events.txt) for formatting. 115 val: integer, 1 for on, 0 for off 116 117 Returns: 118 True if success, false otherwise 119 """ 120 logging.debug("event_onoff: event:%s val:%d", event, val) 121 event_path = event.replace(':', '/') 122 fname = os.path.join(self._TRACE_ROOT, 'events', event_path, 'enable') 123 124 if not os.path.exists(fname): 125 logging.warning("Unable to locate tracing event %s", fname) 126 return False 127 utils.write_one_line(fname, val) 128 129 fname = os.path.join(self._TRACE_ROOT, "set_event") 130 found = False 131 with open(fname) as fd: 132 for ln in fd.readlines(): 133 logging.debug("set_event ln:%s", ln) 134 if re.findall(event, ln): 135 found = True 136 break 137 138 if val == 1 and not found: 139 logging.warning("Event %s not enabled", event) 140 return False 141 142 if val == 0 and found: 143 logging.warning("Event %s not disabled", event) 144 return False 145 146 return True 147 148 149 def event_on(self, event): 150 return self._event_onoff(event, 1) 151 152 153 def event_off(self, event): 154 return self._event_onoff(event, 0) 155 156 157 def flush(self): 158 """Flush trace buffer. 159 160 Raises: 161 error.TestFail: If unable to flush 162 """ 163 self.off() 164 fname = os.path.join(self._TRACE_ROOT, 'free_buffer') 165 utils.write_one_line(fname, 1) 166 self._buffer_ptr = 0 167 168 fname = os.path.join(self._TRACE_ROOT, 'buffer_size_kb') 169 result = utils.read_one_line(fname).strip() 170 if result == '0': 171 return True 172 return False 173 174 175 def read(self, regexp=None): 176 fname = os.path.join(self._TRACE_ROOT, 'trace') 177 fd = open(fname) 178 fd.seek(self._buffer_ptr) 179 for ln in fd.readlines(): 180 if regexp is None: 181 self._buffer.append(ln) 182 continue 183 results = re.findall(regexp, ln) 184 if results: 185 logging.debug(ln) 186 self._buffer.append(results[0]) 187 self._buffer_ptr = fd.tell() 188 fd.close() 189 return self._buffer 190 191 192 @staticmethod 193 def uptime_secs(): 194 results = utils.read_one_line("/proc/uptime") 195 return float(results.split()[0]) 196