• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging, os, re
6
7from autotest_lib.client.bin import utils
8from autotest_lib.client.common_lib import error
9
10class KernelTrace(object):
11    """Allows access and control to Kernel tracing facilities.
12
13    Example code snippet:
14        trace = KernelTrace(events=['mali_dvfs:mali_dvfs_set_clock'])
15        results = trace.read(regexp=r'frequency=(\d+)')
16
17    Public methods:
18        on          : Enables tracing
19        off         : Disables tracing
20        is_tracing  : Returns Boolean of tracing status.
21        event_on    : Turns event on.  Returns boolean of success
22        event_off   : Turns event off.  Returns boolean of success
23        flush       : Flushes trace buffer
24        read        : Reads trace buffer returns list of
25                      - tuples if regexp provided
26                      - else matching string
27        uptime_secs : Returns float of current uptime.
28
29    Private functions:
30        _onoff       : Disable/enable tracing
31        _onoff_event : Disable/enable events
32
33    Private attributes:
34        _buffer      : list to hold parsed results from trace buffer
35        _buffer_ptr  : integer pointing to last byte read
36
37    TODO(tbroch):  List of potential enhancements
38       - currently only supports trace events.  Add other tracers.
39    """
40    _TRACE_ROOT = '/sys/kernel/debug/tracing'
41    _TRACE_ON_PATH = os.path.join(_TRACE_ROOT, 'tracing_on')
42
43    def __init__(self, flush=True, events=None, on=True):
44        """Constructor for KernelTrace class"""
45        self._buffer = []
46        self._buffer_ptr = 0
47        self._events = []
48        self._on = on
49
50        if flush:
51            self.flush()
52        for event in events:
53            if self.event_on(event):
54                self._events.append(event)
55        if on:
56            self.on()
57
58
59    def __del__(self, flush=True, events=None, on=True):
60        """Deconstructor for KernelTrace class"""
61        for event in self._events:
62            self.event_off(event)
63        if self._on:
64            self.off()
65
66
67    def _onoff(self, val):
68        """Turn tracing on or off.
69
70        Arguments:
71            val: integer, 1 for on, 0 for off
72
73        Raises:
74            error.TestFail: If unable to turn tracing on or off.
75        """
76        utils.write_one_line(self._TRACE_ON_PATH, val)
77        result = int(utils.read_one_line(self._TRACE_ON_PATH).strip())
78        if not result == val:
79            raise error.TestFail("Unable to %sable tracing" %
80                                 'en' if val == 1 else 'dis')
81
82
83    def on(self):
84        """Enable tracing."""
85        return self._onoff(1)
86
87
88    def off(self):
89        """Disable tracing."""
90        self._onoff(0)
91
92
93    def is_tracing(self):
94        """Is tracing on?
95
96        Returns:
97            True if tracing enabled and at least one event is enabled.
98        """
99        result = int(utils.read_one_line(self._TRACE_ON_PATH).strip())
100        if result == 1 and len(self._events) > 0:
101            return True
102        return False
103
104
105    def _event_onoff(self, event, val):
106        """Enable/Disable tracing event.
107
108        TODO(tbroch) Consider allowing wild card enabling of trace events via
109            /sys/kernel/debug/tracing/set_event although it makes filling buffer
110            really easy
111
112        Arguments:
113            event: list of events.
114                   See kernel(Documentation/trace/events.txt) for formatting.
115            val: integer, 1 for on, 0 for off
116
117         Returns:
118            True if success, false otherwise
119        """
120        logging.debug("event_onoff: event:%s val:%d", event, val)
121        event_path = event.replace(':', '/')
122        fname = os.path.join(self._TRACE_ROOT, 'events', event_path, 'enable')
123
124        if not os.path.exists(fname):
125            logging.warning("Unable to locate tracing event %s", fname)
126            return False
127        utils.write_one_line(fname, val)
128
129        fname = os.path.join(self._TRACE_ROOT, "set_event")
130        found = False
131        with open(fname) as fd:
132            for ln in fd.readlines():
133                logging.debug("set_event ln:%s", ln)
134                if re.findall(event, ln):
135                    found = True
136                    break
137
138        if val == 1 and not found:
139            logging.warning("Event %s not enabled", event)
140            return False
141
142        if val == 0 and found:
143            logging.warning("Event %s not disabled", event)
144            return False
145
146        return True
147
148
149    def event_on(self, event):
150        return self._event_onoff(event, 1)
151
152
153    def event_off(self, event):
154        return self._event_onoff(event, 0)
155
156
157    def flush(self):
158        """Flush trace buffer.
159
160        Raises:
161            error.TestFail: If unable to flush
162        """
163        self.off()
164        fname = os.path.join(self._TRACE_ROOT, 'free_buffer')
165        utils.write_one_line(fname, 1)
166        self._buffer_ptr = 0
167
168        fname = os.path.join(self._TRACE_ROOT, 'buffer_size_kb')
169        result = utils.read_one_line(fname).strip()
170        if result is '0':
171            return True
172        return False
173
174
175    def read(self, regexp=None):
176        fname = os.path.join(self._TRACE_ROOT, 'trace')
177        fd = open(fname)
178        fd.seek(self._buffer_ptr)
179        for ln in fd.readlines():
180            if regexp is None:
181                self._buffer.append(ln)
182                continue
183            results = re.findall(regexp, ln)
184            if results:
185                logging.debug(ln)
186                self._buffer.append(results[0])
187        self._buffer_ptr = fd.tell()
188        fd.close()
189        return self._buffer
190
191
192    @staticmethod
193    def uptime_secs():
194        results = utils.read_one_line("/proc/uptime")
195        return float(results.split()[0])
196