1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging, os, re 6 7from autotest_lib.client.bin import utils 8from autotest_lib.client.common_lib import error 9 10class KernelTrace(object): 11 """Allows access and control to Kernel tracing facilities. 12 13 Example code snippet: 14 trace = KernelTrace(events=['mali_dvfs:mali_dvfs_set_clock']) 15 results = trace.read(regexp=r'frequency=(\d+)') 16 17 Public methods: 18 on : Enables tracing 19 off : Disables tracing 20 is_tracing : Returns Boolean of tracing status. 21 event_on : Turns event on. Returns boolean of success 22 event_off : Turns event off. Returns boolean of success 23 flush : Flushes trace buffer 24 read : Reads trace buffer returns list of 25 - tuples if regexp provided 26 - else matching string 27 uptime_secs : Returns float of current uptime. 28 29 Private functions: 30 _onoff : Disable/enable tracing 31 _onoff_event : Disable/enable events 32 33 Private attributes: 34 _buffer : list to hold parsed results from trace buffer 35 _buffer_ptr : integer pointing to last byte read 36 37 TODO(tbroch): List of potential enhancements 38 - currently only supports trace events. Add other tracers. 39 """ 40 _TRACE_ROOT = '/sys/kernel/debug/tracing' 41 _TRACE_EN_PATH = os.path.join(_TRACE_ROOT, 'tracing_enabled') 42 43 def __init__(self, flush=True, events=None, on=True): 44 """Constructor for KernelTrace class""" 45 self._buffer = [] 46 self._buffer_ptr = 0 47 self._events = [] 48 self._on = on 49 50 if flush: 51 self.flush() 52 for event in events: 53 if self.event_on(event): 54 self._events.append(event) 55 if on: 56 self.on() 57 58 59 def __del__(self, flush=True, events=None, on=True): 60 """Deconstructor for KernelTrace class""" 61 for event in self._events: 62 self.event_off(event) 63 if self._on: 64 self.off() 65 66 67 def _onoff(self, val): 68 """Enable/Disable tracing. 69 70 Arguments: 71 val: integer, 1 for on, 0 for off 72 73 Raises: 74 error.TestFail: If unable to enable/disable tracing 75 boolean of tracing on/off status 76 """ 77 utils.write_one_line(self._TRACE_EN_PATH, val) 78 fname = os.path.join(self._TRACE_ROOT, 'tracing_on') 79 result = int(utils.read_one_line(fname).strip()) 80 if not result == val: 81 raise error.TestFail("Unable to %sable tracing" % 82 'en' if val == 1 else 'dis') 83 84 85 def on(self): 86 """Enable tracing.""" 87 return self._onoff(1) 88 89 90 def off(self): 91 """Disable tracing.""" 92 self._onoff(0) 93 94 95 def is_tracing(self): 96 """Is tracing on? 97 98 Returns: 99 True if tracing enabled and at least one event is enabled. 100 """ 101 fname = os.path.join(self._TRACE_ROOT, 'tracing_on') 102 result = int(utils.read_one_line(fname).strip()) 103 if result == 1 and len(self._events) > 0: 104 return True 105 return False 106 107 108 def _event_onoff(self, event, val): 109 """Enable/Disable tracing event. 110 111 TODO(tbroch) Consider allowing wild card enabling of trace events via 112 /sys/kernel/debug/tracing/set_event although it makes filling buffer 113 really easy 114 115 Arguments: 116 event: list of events. 117 See kernel(Documentation/trace/events.txt) for formatting. 118 val: integer, 1 for on, 0 for off 119 120 Returns: 121 True if success, false otherwise 122 """ 123 logging.debug("event_onoff: event:%s val:%d", event, val) 124 event_path = event.replace(':', '/') 125 fname = os.path.join(self._TRACE_ROOT, 'events', event_path, 'enable') 126 127 if not os.path.exists(fname): 128 logging.warning("Unable to locate tracing event %s", fname) 129 return False 130 utils.write_one_line(fname, val) 131 132 fname = os.path.join(self._TRACE_ROOT, "set_event") 133 found = False 134 with open(fname) as fd: 135 for ln in fd.readlines(): 136 logging.debug("set_event ln:%s", ln) 137 if re.findall(event, ln): 138 found = True 139 break 140 141 if val == 1 and not found: 142 logging.warning("Event %s not enabled", event) 143 return False 144 145 if val == 0 and found: 146 logging.warning("Event %s not disabled", event) 147 return False 148 149 return True 150 151 152 def event_on(self, event): 153 return self._event_onoff(event, 1) 154 155 156 def event_off(self, event): 157 return self._event_onoff(event, 0) 158 159 160 def flush(self): 161 """Flush trace buffer. 162 163 Raises: 164 error.TestFail: If unable to flush 165 """ 166 self.off() 167 fname = os.path.join(self._TRACE_ROOT, 'free_buffer') 168 utils.write_one_line(fname, 1) 169 self._buffer_ptr = 0 170 171 fname = os.path.join(self._TRACE_ROOT, 'buffer_size_kb') 172 result = utils.read_one_line(fname).strip() 173 if result is '0': 174 return True 175 return False 176 177 178 def read(self, regexp=None): 179 fname = os.path.join(self._TRACE_ROOT, 'trace') 180 fd = open(fname) 181 fd.seek(self._buffer_ptr) 182 for ln in fd.readlines(): 183 if regexp is None: 184 self._buffer.append(ln) 185 continue 186 results = re.findall(regexp, ln) 187 if results: 188 logging.debug(ln) 189 self._buffer.append(results[0]) 190 self._buffer_ptr = fd.tell() 191 fd.close() 192 return self._buffer 193 194 195 @staticmethod 196 def uptime_secs(): 197 results = utils.read_one_line("/proc/uptime") 198 return float(results.split()[0]) 199