• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# SPDX-License-Identifier: Apache-2.0
2#
3# Copyright (C) 2015, ARM Limited and contributors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18""" Trace Parser Module """
19
20import numpy as np
21import os
22import pandas as pd
23import sys
24import trappy
25import json
26import warnings
27import operator
28import logging
29
30from analysis_register import AnalysisRegister
31from collections import namedtuple
32from devlib.utils.misc import memoized
33from trappy.utils import listify, handle_duplicate_index
34
35
36NON_IDLE_STATE = -1
37ResidencyTime = namedtuple('ResidencyTime', ['total', 'active'])
38ResidencyData = namedtuple('ResidencyData', ['label', 'residency'])
39
40class Trace(object):
41    """
42    The Trace object is the LISA trace events parser.
43
44    :param platform: a dictionary containing information about the target
45        platform
46    :type platform: dict
47
48    :param data_dir: folder containing all trace data
49    :type data_dir: str
50
51    :param events: events to be parsed (everything in the trace by default)
52    :type events: list(str)
53
54    :param tasks: filter data for the specified tasks only. If None (default),
55        use data for all tasks found in the trace.
56    :type tasks: list(str) or NoneType
57
58    :param window: time window to consider when parsing the trace
59    :type window: tuple(int, int)
60
61    :param normalize_time: normalize trace time stamps
62    :type normalize_time: bool
63
64    :param trace_format: format of the trace. Possible values are:
65        - FTrace
66        - SysTrace
67    :type trace_format: str
68
69    :param plots_dir: directory where to save plots
70    :type plots_dir: str
71
72    :param plots_prefix: prefix for plots file names
73    :type plots_prefix: str
74
75    :param cgroup_info: add cgroup information for sanitization
76      example:
77        {
78            'controller_ids': { 2: 'schedtune', 4: 'cpuset' },
79            'cgroups': [ 'root', 'background', 'foreground' ],  # list of allowed cgroup names
80        }
81    :type cgroup_info: dict
82    """
83
84    def __init__(self, platform, data_dir, events=None,
85                 tasks=None, window=(0, None),
86                 normalize_time=True,
87                 trace_format='FTrace',
88                 plots_dir=None,
89                 plots_prefix='',
90                 cgroup_info={}):
91
92        # The platform used to run the experiments
93        self.platform = platform
94
95        # TRAPpy Trace object
96        self.ftrace = None
97
98        # Trace format
99        self.trace_format = trace_format
100
101        # The time window used to limit trace parsing to
102        self.window = window
103
104        # Dynamically registered TRAPpy events
105        self.trappy_cls = {}
106
107        # Maximum timespan for all collected events
108        self.time_range = 0
109
110        # Time the system was overutilzied
111        self.overutilized_time = 0
112        self.overutilized_prc = 0
113
114        # The dictionary of tasks descriptors available in the dataset
115        self.tasks = {}
116
117        # List of events required by user
118        self.events = []
119
120        # List of events available in the parsed trace
121        self.available_events = []
122
123        # Cluster frequency coherency flag
124        self.freq_coherency = True
125
126        # Folder containing all trace data
127        self.data_dir = None
128
129        # Setup logging
130        self._log = logging.getLogger('Trace')
131
132        # Folder containing trace
133        if not os.path.isdir(data_dir):
134            self.data_dir = os.path.dirname(data_dir)
135        else:
136            self.data_dir = data_dir
137
138        # By deafult, use the trace dir to save plots
139        self.plots_dir = plots_dir
140        if self.plots_dir is None:
141            self.plots_dir = self.data_dir
142        self.plots_prefix = plots_prefix
143
144        # Cgroup info for sanitization
145        self.cgroup_info = cgroup_info
146
147        self.__registerTraceEvents(events) if events else None
148        self.__parseTrace(data_dir, tasks, window, normalize_time,
149                          trace_format)
150        self.__computeTimeSpan()
151
152        # Minimum and Maximum x_time to use for all plots
153        self.x_min = 0
154        self.x_max = self.time_range
155
156        # Reset x axis time range to full scale
157        t_min = self.window[0]
158        t_max = self.window[1]
159        self.setXTimeRange(t_min, t_max)
160
161        self.data_frame = TraceData()
162        self._registerDataFrameGetters(self)
163
164        self.analysis = AnalysisRegister(self)
165
166    def _registerDataFrameGetters(self, module):
167        """
168        Internal utility function that looks up getter functions with a "_dfg_"
169        prefix in their name and bounds them to the specified module.
170
171        :param module: module to which the function is added
172        :type module: class
173        """
174        self._log.debug('Registering [%s] local data frames', module)
175        for func in dir(module):
176            if not func.startswith('_dfg_'):
177                continue
178            dfg_name = func.replace('_dfg_', '')
179            dfg_func = getattr(module, func)
180            self._log.debug('   %s', dfg_name)
181            setattr(self.data_frame, dfg_name, dfg_func)
182
183    def setXTimeRange(self, t_min=None, t_max=None):
184        """
185        Set x axis time range to the specified values.
186
187        :param t_min: lower bound
188        :type t_min: int or float
189
190        :param t_max: upper bound
191        :type t_max: int or float
192        """
193        if t_min is None:
194            self.x_min = 0
195        else:
196            self.x_min = t_min
197        if t_max is None:
198            self.x_max = self.time_range
199        else:
200            self.x_max = t_max
201        self._log.debug('Set plots time range to (%.6f, %.6f)[s]',
202                       self.x_min, self.x_max)
203
204    def __registerTraceEvents(self, events):
205        """
206        Save a copy of the parsed events.
207
208        :param events: single event name or list of events names
209        :type events: str or list(str)
210        """
211        if isinstance(events, basestring):
212            self.events = events.split(' ')
213        elif isinstance(events, list):
214            self.events = events
215        else:
216            raise ValueError('Events must be a string or a list of strings')
217        # Register devlib fake cpu_frequency events
218        if 'cpu_frequency' in events:
219            self.events.append('cpu_frequency_devlib')
220
221    def __parseTrace(self, path, tasks, window, normalize_time, trace_format):
222        """
223        Internal method in charge of performing the actual parsing of the
224        trace.
225
226        :param path: path to the trace folder (or trace file)
227        :type path: str
228
229        :param tasks: filter data for the specified tasks only
230        :type tasks: list(str)
231
232        :param window: time window to consider when parsing the trace
233        :type window: tuple(int, int)
234
235        :param normalize_time: normalize trace time stamps
236        :type normalize_time: bool
237
238        :param trace_format: format of the trace. Possible values are:
239            - FTrace
240            - SysTrace
241        :type trace_format: str
242        """
243        self._log.debug('Loading [sched] events from trace in [%s]...', path)
244        self._log.debug('Parsing events: %s', self.events if self.events else 'ALL')
245        if trace_format.upper() == 'SYSTRACE' or path.endswith('html'):
246            self._log.debug('Parsing SysTrace format...')
247            trace_class = trappy.SysTrace
248            self.trace_format = 'SysTrace'
249        elif trace_format.upper() == 'FTRACE':
250            self._log.debug('Parsing FTrace format...')
251            trace_class = trappy.FTrace
252            self.trace_format = 'FTrace'
253        else:
254            raise ValueError("Unknown trace format {}".format(trace_format))
255
256        scope = 'custom' if self.events else 'all'
257        self.ftrace = trace_class(path, scope=scope, events=self.events,
258                                  window=window, normalize_time=normalize_time)
259
260        # Load Functions profiling data
261        has_function_stats = self._loadFunctionsStats(path)
262
263        # Check for events available on the parsed trace
264        self.__checkAvailableEvents()
265        if len(self.available_events) == 0:
266            if has_function_stats:
267                self._log.info('Trace contains only functions stats')
268                return
269            raise ValueError('The trace does not contain useful events '
270                             'nor function stats')
271
272        # Sanitize cgroup info if any
273        self._sanitize_CgroupAttachTask()
274
275        # Santization not possible if platform missing
276        if not self.platform:
277            # Setup internal data reference to interesting events/dataframes
278            self._sanitize_SchedLoadAvgCpu()
279            self._sanitize_SchedLoadAvgTask()
280            self._sanitize_SchedCpuCapacity()
281            self._sanitize_SchedBoostCpu()
282            self._sanitize_SchedBoostTask()
283            self._sanitize_SchedEnergyDiff()
284            self._sanitize_SchedOverutilized()
285            self._sanitize_CpuFrequency()
286
287        self.__loadTasksNames(tasks)
288
289        # Compute plot window
290        if not normalize_time:
291            start = self.window[0]
292            if self.window[1]:
293                duration = min(self.ftrace.get_duration(), self.window[1])
294            else:
295                duration = self.ftrace.get_duration()
296            self.window = (self.ftrace.basetime + start,
297                           self.ftrace.basetime + duration)
298
299    def __checkAvailableEvents(self, key=""):
300        """
301        Internal method used to build a list of available events.
302
303        :param key: key to be used for TRAPpy filtering
304        :type key: str
305        """
306        for val in self.ftrace.get_filters(key):
307            obj = getattr(self.ftrace, val)
308            if len(obj.data_frame):
309                self.available_events.append(val)
310        self._log.debug('Events found on trace:')
311        for evt in self.available_events:
312            self._log.debug(' - %s', evt)
313
314    def __loadTasksNames(self, tasks):
315        """
316        Try to load tasks names using one of the supported events.
317
318        :param tasks: list of task names. If None, load all tasks found.
319        :type tasks: list(str) or NoneType
320        """
321        def load(tasks, event, name_key, pid_key):
322            df = self._dfg_trace_event(event)
323            if tasks is None:
324                tasks = df[name_key].unique()
325            self.getTasks(df, tasks, name_key=name_key, pid_key=pid_key)
326            self._scanTasks(df, name_key=name_key, pid_key=pid_key)
327            self._scanTgids(df)
328
329        if 'sched_switch' in self.available_events:
330            load(tasks, 'sched_switch', 'next_comm', 'next_pid')
331        elif 'sched_load_avg_task' in self.available_events:
332            load(tasks, 'sched_load_avg_task', 'comm', 'pid')
333        else:
334            self._log.warning('Failed to load tasks names from trace events')
335
336    def hasEvents(self, dataset):
337        """
338        Returns True if the specified event is present in the parsed trace,
339        False otherwise.
340
341        :param dataset: trace event name or list of trace events
342        :type dataset: str or list(str)
343        """
344        if dataset in self.available_events:
345            return True
346        return False
347
348    def __computeTimeSpan(self):
349        """
350        Compute time axis range, considering all the parsed events.
351        """
352        ts = sys.maxint
353        te = 0
354
355        for events in self.available_events:
356            df = self._dfg_trace_event(events)
357            if len(df) == 0:
358                continue
359            if (df.index[0]) < ts:
360                ts = df.index[0]
361            if (df.index[-1]) > te:
362                te = df.index[-1]
363            self.time_range = te - ts
364
365        self._log.debug('Collected events spans a %.3f [s] time interval',
366                       self.time_range)
367
368        # Build a stat on trace overutilization
369        if self.hasEvents('sched_overutilized'):
370            df = self._dfg_trace_event('sched_overutilized')
371            self.overutilized_time = df[df.overutilized == 1].len.sum()
372            self.overutilized_prc = 100. * self.overutilized_time / self.time_range
373
374            self._log.debug('Overutilized time: %.6f [s] (%.3f%% of trace time)',
375                           self.overutilized_time, self.overutilized_prc)
376
377    def _scanTgids(self, df):
378        if not '__tgid' in df.columns:
379            return
380        df = df[['__pid', '__tgid']]
381        df = df.drop_duplicates(keep='first').set_index('__pid')
382        df.rename(columns = { '__pid': 'pid', '__tgid': 'tgid' },
383                              inplace=True)
384        self._pid_tgid = df
385
386    def _scanTasks(self, df, name_key='comm', pid_key='pid'):
387        """
388        Extract tasks names and PIDs from the input data frame. The data frame
389        should contain a task name column and PID column.
390
391        :param df: data frame containing trace events from which tasks names
392            and PIDs will be extracted
393        :type df: :mod:`pandas.DataFrame`
394
395        :param name_key: The name of the dataframe columns containing task
396            names
397        :type name_key: str
398
399        :param pid_key: The name of the dataframe columns containing task PIDs
400        :type pid_key: str
401        """
402        df = df[[name_key, pid_key]].drop_duplicates()
403        self._tasks_by_name = df.set_index(name_key)
404        self._tasks_by_pid = df.set_index(pid_key)
405
406    def getTaskByName(self, name):
407        """
408        Get the PIDs of all tasks with the specified name.
409
410        :param name: task name
411        :type name: str
412        """
413        if name not in self._tasks_by_name.index:
414            return []
415        if len(self._tasks_by_name.ix[name].values) > 1:
416            return list({task[0] for task in
417                         self._tasks_by_name.ix[name].values})
418        return [self._tasks_by_name.ix[name].values[0]]
419
420    def getTaskByPid(self, pid):
421        """
422        Get the names of all tasks with the specified PID.
423
424        :param name: task PID
425        :type name: int
426        """
427        if pid not in self._tasks_by_pid.index:
428            return []
429        if len(self._tasks_by_pid.ix[pid].values) > 1:
430            return list({task[0] for task in
431                         self._tasks_by_pid.ix[pid].values})
432        return [self._tasks_by_pid.ix[pid].values[0]]
433
434    def getTgidFromPid(self, pid):
435        return _pid_tgid.ix[pid].values[0]
436
437    def getTasks(self, dataframe=None,
438                 task_names=None, name_key='comm', pid_key='pid'):
439        """
440        Helper function to get PIDs of specified tasks.
441
442        This method can take a Pandas dataset in input to be used to fiter out
443        the PIDs of all the specified tasks. If a dataset is not provided,
444        previously filtered PIDs are returned.
445
446        If a list of task names is not provided, all tasks detected in the trace
447        will be used. The specified dataframe must provide at least two columns
448        reporting the task name and the task PID. The default values of this
449        colums could be specified using the provided parameters.
450
451        :param dataframe: A Pandas dataframe containing at least 'name_key' and
452            'pid_key' columns. If None, the all PIDs are returned.
453        :type dataframe: :mod:`pandas.DataFrame`
454
455        :param task_names: The list of tasks to get the PID of (default: all
456            tasks)
457        :type task_names: list(str)
458
459        :param name_key: The name of the dataframe columns containing task
460            names
461        :type name_key: str
462
463        :param pid_key: The name of the dataframe columns containing task PIDs
464        :type pid_key: str
465        """
466        if task_names is None:
467            task_names = self.tasks.keys()
468        if dataframe is None:
469            return {k: v for k, v in  self.tasks.iteritems() if k in task_names}
470        df = dataframe
471        self._log.debug('Lookup dataset for tasks...')
472        for tname in task_names:
473            self._log.debug('Lookup for task [%s]...', tname)
474            results = df[df[name_key] == tname][[name_key, pid_key]]
475            if len(results) == 0:
476                self._log.error('  task %16s NOT found', tname)
477                continue
478            (name, pid) = results.head(1).values[0]
479            if name != tname:
480                self._log.error('  task %16s NOT found', tname)
481                continue
482            if tname not in self.tasks:
483                self.tasks[tname] = {}
484            pids = list(results[pid_key].unique())
485            self.tasks[tname]['pid'] = pids
486            self._log.debug('  task %16s found, pid: %s',
487                            tname, self.tasks[tname]['pid'])
488        return self.tasks
489
490
491###############################################################################
492# DataFrame Getter Methods
493###############################################################################
494
495    def df(self, event):
496        """
497        Get a dataframe containing all occurrences of the specified trace event
498        in the parsed trace.
499
500        :param event: Trace event name
501        :type event: str
502        """
503        warnings.simplefilter('always', DeprecationWarning) #turn off filter
504        warnings.warn("\n\tUse of Trace::df() is deprecated and will be soon removed."
505                      "\n\tUse Trace::data_frame.trace_event(event_name) instead.",
506                      category=DeprecationWarning)
507        warnings.simplefilter('default', DeprecationWarning) #reset filter
508        return self._dfg_trace_event(event)
509
510    def _dfg_trace_event(self, event):
511        """
512        Get a dataframe containing all occurrences of the specified trace event
513        in the parsed trace.
514
515        :param event: Trace event name
516        :type event: str
517        """
518        if self.data_dir is None:
519            raise ValueError("trace data not (yet) loaded")
520        if self.ftrace and hasattr(self.ftrace, event):
521            return getattr(self.ftrace, event).data_frame
522        raise ValueError('Event [{}] not supported. '
523                         'Supported events are: {}'
524                         .format(event, self.available_events))
525
526    def _dfg_functions_stats(self, functions=None):
527        """
528        Get a DataFrame of specified kernel functions profile data
529
530        For each profiled function a DataFrame is returned which reports stats
531        on kernel functions execution time. The reported stats are per-CPU and
532        includes: number of times the function has been executed (hits),
533        average execution time (avg), overall execution time (time) and samples
534        variance (s_2).
535        By default returns a DataFrame of all the functions profiled.
536
537        :param functions: the name of the function or a list of function names
538                          to report
539        :type functions: str or list(str)
540        """
541        if not hasattr(self, '_functions_stats_df'):
542            return None
543        df = self._functions_stats_df
544        if not functions:
545            return df
546        return df.loc[df.index.get_level_values(1).isin(listify(functions))]
547
548    # cgroup_attach_task with just merged fake and real events
549    def _cgroup_attach_task(self):
550        cgroup_events = ['cgroup_attach_task', 'cgroup_attach_task_devlib']
551        df = None
552
553        if set(cgroup_events).isdisjoint(set(self.available_events)):
554            self._log.error('atleast one of {} is needed for cgroup_attach_task event generation'.format(cgroup_events))
555            return None
556
557        for cev in cgroup_events:
558            if not cev in self.available_events:
559                continue
560            cdf = self._dfg_trace_event(cev)
561            cdf = cdf[['__line', 'pid', 'controller', 'cgroup']]
562            if not isinstance(df, pd.DataFrame):
563                df = cdf
564            else:
565                df = pd.concat([cdf, df])
566
567        # Always drop na since this DF is used as secondary
568        df.dropna(inplace=True, how='any')
569        return df
570
571    @memoized
572    def _dfg_cgroup_attach_task(self, controllers = ['schedtune', 'cpuset']):
573        # Since fork doesn't result in attach events, generate fake attach events
574        # The below mechanism doesn't work to propogate nested fork levels:
575        # For ex:
576        # cgroup_attach_task: pid=1166
577        # fork: pid=1166 child_pid=2222  <-- fake attach generated
578        # fork: pid=2222 child_pid=3333  <-- fake attach not generated
579        def fork_add_cgroup(fdf, cdf, controller):
580            cdf = cdf[cdf['controller'] == controller]
581            ret_df = trappy.utils.merge_dfs(fdf, cdf, pivot='pid')
582            return ret_df
583
584        if not 'sched_process_fork' in self.available_events:
585            self._log.error('sched_process_fork is mandatory to get proper cgroup_attach events')
586            return None
587        fdf = self._dfg_trace_event('sched_process_fork')
588
589        forks_len = len(fdf)
590        forkdf = fdf
591        cdf = self._cgroup_attach_task()
592        for idx, c in enumerate(controllers):
593            fdf = fork_add_cgroup(fdf, cdf, c)
594            if (idx != (len(controllers) - 1)):
595                fdf = pd.concat([fdf, forkdf]).sort_values(by='__line')
596
597        fdf = fdf[['__line', 'child_pid', 'controller', 'cgroup']]
598        fdf.rename(columns = { 'child_pid': 'pid' }, inplace=True)
599
600        # Always drop na since this DF is used as secondary
601        fdf.dropna(inplace=True, how='any')
602
603        new_forks_len = len(fdf) / len(controllers)
604
605        fdf = pd.concat([fdf, cdf]).sort_values(by='__line')
606
607        if new_forks_len < forks_len:
608            dropped = forks_len - new_forks_len
609            self._log.info("Couldn't attach all forks cgroup with-attach events ({} dropped)".format(dropped))
610        return fdf
611
612    @memoized
613    def _dfg_sched_switch_cgroup(self, controllers = ['schedtune', 'cpuset']):
614        def sched_switch_add_cgroup(sdf, cdf, controller, direction):
615            cdf = cdf[cdf['controller'] == controller]
616
617            ret_df = sdf.rename(columns = { direction + '_pid': 'pid' })
618            ret_df = trappy.utils.merge_dfs(ret_df, cdf, pivot='pid')
619            ret_df.rename(columns = { 'pid': direction + '_pid' }, inplace=True)
620
621            ret_df.drop('controller', axis=1, inplace=True)
622            ret_df.rename(columns = { 'cgroup': direction + '_' + controller }, inplace=True)
623            return ret_df
624
625        if not 'sched_switch' in self.available_events:
626            self._log.error('sched_switch is mandatory to generate sched_switch_cgroup event')
627            return None
628        sdf = self._dfg_trace_event('sched_switch')
629        cdf = self._dfg_cgroup_attach_task()
630
631        for c in controllers:
632            sdf = sched_switch_add_cgroup(sdf, cdf, c, 'next')
633            sdf = sched_switch_add_cgroup(sdf, cdf, c, 'prev')
634
635        # Augment with TGID information
636        sdf = sdf.join(self._pid_tgid, on='next_pid').rename(columns = {'tgid': 'next_tgid'})
637        sdf = sdf.join(self._pid_tgid, on='prev_pid').rename(columns = {'tgid': 'prev_tgid'})
638
639        df = self._tasks_by_pid.rename(columns = { 'next_comm': 'comm' })
640        sdf = sdf.join(df, on='next_tgid').rename(columns = {'comm': 'next_tgid_comm'})
641        sdf = sdf.join(df, on='prev_tgid').rename(columns = {'comm': 'prev_tgid_comm'})
642        return sdf
643
644###############################################################################
645# Trace Events Sanitize Methods
646###############################################################################
647
648    def _sanitize_SchedCpuCapacity(self):
649        """
650        Add more columns to cpu_capacity data frame if the energy model is
651        available.
652        """
653        if not self.hasEvents('cpu_capacity') \
654           or 'nrg_model' not in self.platform:
655            return
656
657        df = self._dfg_trace_event('cpu_capacity')
658
659        # Add column with LITTLE and big CPUs max capacities
660        nrg_model = self.platform['nrg_model']
661        max_lcap = nrg_model['little']['cpu']['cap_max']
662        max_bcap = nrg_model['big']['cpu']['cap_max']
663        df['max_capacity'] = np.select(
664                [df.cpu.isin(self.platform['clusters']['little'])],
665                [max_lcap], max_bcap)
666        # Add LITTLE and big CPUs "tipping point" threshold
667        tip_lcap = 0.8 * max_lcap
668        tip_bcap = 0.8 * max_bcap
669        df['tip_capacity'] = np.select(
670                [df.cpu.isin(self.platform['clusters']['little'])],
671                [tip_lcap], tip_bcap)
672
673    def _sanitize_SchedLoadAvgCpu(self):
674        """
675        If necessary, rename certain signal names from v5.0 to v5.1 format.
676        """
677        if not self.hasEvents('sched_load_avg_cpu'):
678            return
679        df = self._dfg_trace_event('sched_load_avg_cpu')
680        if 'utilization' in df:
681            df.rename(columns={'utilization': 'util_avg'}, inplace=True)
682            df.rename(columns={'load': 'load_avg'}, inplace=True)
683
684    def _sanitize_SchedLoadAvgTask(self):
685        """
686        If necessary, rename certain signal names from v5.0 to v5.1 format.
687        """
688        if not self.hasEvents('sched_load_avg_task'):
689            return
690        df = self._dfg_trace_event('sched_load_avg_task')
691        if 'utilization' in df:
692            df.rename(columns={'utilization': 'util_avg'}, inplace=True)
693            df.rename(columns={'load': 'load_avg'}, inplace=True)
694            df.rename(columns={'avg_period': 'period_contrib'}, inplace=True)
695            df.rename(columns={'runnable_avg_sum': 'load_sum'}, inplace=True)
696            df.rename(columns={'running_avg_sum': 'util_sum'}, inplace=True)
697        df['cluster'] = np.select(
698                [df.cpu.isin(self.platform['clusters']['little'])],
699                ['LITTLE'], 'big')
700        # Add a column which represents the max capacity of the smallest
701        # clustre which can accomodate the task utilization
702        little_cap = self.platform['nrg_model']['little']['cpu']['cap_max']
703        big_cap = self.platform['nrg_model']['big']['cpu']['cap_max']
704        df['min_cluster_cap'] = df.util_avg.map(
705            lambda util_avg: big_cap if util_avg > little_cap else little_cap
706        )
707
708    def _sanitize_SchedBoostCpu(self):
709        """
710        Add a boosted utilization signal as the sum of utilization and margin.
711
712        Also, if necessary, rename certain signal names from v5.0 to v5.1
713        format.
714        """
715        if not self.hasEvents('sched_boost_cpu'):
716            return
717        df = self._dfg_trace_event('sched_boost_cpu')
718        if 'usage' in df:
719            df.rename(columns={'usage': 'util'}, inplace=True)
720        df['boosted_util'] = df['util'] + df['margin']
721
722    def _sanitize_SchedBoostTask(self):
723        """
724        Add a boosted utilization signal as the sum of utilization and margin.
725
726        Also, if necessary, rename certain signal names from v5.0 to v5.1
727        format.
728        """
729        if not self.hasEvents('sched_boost_task'):
730            return
731        df = self._dfg_trace_event('sched_boost_task')
732        if 'utilization' in df:
733            # Convert signals name from to v5.1 format
734            df.rename(columns={'utilization': 'util'}, inplace=True)
735        df['boosted_util'] = df['util'] + df['margin']
736
737    def _sanitize_SchedEnergyDiff(self):
738        """
739        If a energy model is provided, some signals are added to the
740        sched_energy_diff trace event data frame.
741
742        Also convert between existing field name formats for sched_energy_diff
743        """
744        if not self.hasEvents('sched_energy_diff') \
745           or 'nrg_model' not in self.platform:
746            return
747        nrg_model = self.platform['nrg_model']
748        em_lcluster = nrg_model['little']['cluster']
749        em_bcluster = nrg_model['big']['cluster']
750        em_lcpu = nrg_model['little']['cpu']
751        em_bcpu = nrg_model['big']['cpu']
752        lcpus = len(self.platform['clusters']['little'])
753        bcpus = len(self.platform['clusters']['big'])
754        SCHED_LOAD_SCALE = 1024
755
756        power_max = em_lcpu['nrg_max'] * lcpus + em_bcpu['nrg_max'] * bcpus + \
757            em_lcluster['nrg_max'] + em_bcluster['nrg_max']
758        self._log.debug(
759            "Maximum estimated system energy: {0:d}".format(power_max))
760
761        df = self._dfg_trace_event('sched_energy_diff')
762
763        translations = {'nrg_d' : 'nrg_diff',
764                        'utl_d' : 'usage_delta',
765                        'payoff' : 'nrg_payoff'
766        }
767        df.rename(columns=translations, inplace=True)
768
769        df['nrg_diff_pct'] = SCHED_LOAD_SCALE * df.nrg_diff / power_max
770
771        # Tag columns by usage_delta
772        ccol = df.usage_delta
773        df['usage_delta_group'] = np.select(
774            [ccol < 150, ccol < 400, ccol < 600],
775            ['< 150', '< 400', '< 600'], '>= 600')
776
777        # Tag columns by nrg_payoff
778        ccol = df.nrg_payoff
779        df['nrg_payoff_group'] = np.select(
780            [ccol > 2e9, ccol > 0, ccol > -2e9],
781            ['Optimal Accept', 'SchedTune Accept', 'SchedTune Reject'],
782            'Suboptimal Reject')
783
784    def _sanitize_SchedOverutilized(self):
785        """ Add a column with overutilized status duration. """
786        if not self.hasEvents('sched_overutilized'):
787            return
788        df = self._dfg_trace_event('sched_overutilized')
789        df['start'] = df.index
790        df['len'] = (df.start - df.start.shift()).fillna(0).shift(-1)
791        df.drop('start', axis=1, inplace=True)
792
793    # Sanitize cgroup information helper
794    def _helper_sanitize_CgroupAttachTask(self, df, allowed_cgroups, controller_id_name):
795        # Drop rows that aren't in the root-id -> name map
796        df = df[df['dst_root'].isin(controller_id_name.keys())]
797
798        def get_cgroup_name(path, valid_names):
799            name = os.path.basename(path)
800            name = 'root' if not name in valid_names else name
801            return name
802
803        def get_cgroup_names(rows):
804            ret = []
805            for r in rows.iterrows():
806                 ret.append(get_cgroup_name(r[1]['dst_path'], allowed_cgroups))
807            return ret
808
809        def get_controller_names(rows):
810            ret = []
811            for r in rows.iterrows():
812                 ret.append(controller_id_name[r[1]['dst_root']])
813            return ret
814
815        # Sanitize cgroup names
816        # cgroup column isn't in mainline, add it in
817        # its already added for some out of tree kernels so check first
818        if not 'cgroup' in df.columns:
819            if not 'dst_path' in df.columns:
820                raise RuntimeError('Cant santize cgroup DF, need dst_path')
821            df = df.assign(cgroup = get_cgroup_names)
822
823        # Sanitize controller names
824        if not 'controller' in df.columns:
825            if not 'dst_root' in df.columns:
826                raise RuntimeError('Cant santize cgroup DF, need dst_path')
827            df = df.assign(controller = get_controller_names)
828
829        return df
830
831    def _sanitize_CgroupAttachTask(self):
832        def sanitize_cgroup_event(name):
833            if not name in self.available_events:
834                return
835
836            df = self._dfg_trace_event(name)
837
838            if len(df.groupby(level=0).filter(lambda x: len(x) > 1)) > 0:
839                self._log.warning('Timstamp Collisions seen in {} event!'.format(name))
840
841            df = self._helper_sanitize_CgroupAttachTask(df, self.cgroup_info['cgroups'],
842                                              self.cgroup_info['controller_ids'])
843            getattr(self.ftrace, name).data_frame = df
844        sanitize_cgroup_event('cgroup_attach_task')
845        sanitize_cgroup_event('cgroup_attach_task_devlib')
846
847    def _chunker(self, seq, size):
848        """
849        Given a data frame or a series, generate a sequence of chunks of the
850        given size.
851
852        :param seq: data to be split into chunks
853        :type seq: :mod:`pandas.Series` or :mod:`pandas.DataFrame`
854
855        :param size: size of each chunk
856        :type size: int
857        """
858        return (seq.iloc[pos:pos + size] for pos in range(0, len(seq), size))
859
860    def _sanitize_CpuFrequency(self):
861        """
862        Verify that all platform reported clusters are frequency coherent (i.e.
863        frequency scaling is performed at a cluster level).
864        """
865        if not self.hasEvents('cpu_frequency_devlib'):
866            return
867
868        devlib_freq = self._dfg_trace_event('cpu_frequency_devlib')
869        devlib_freq.rename(columns={'cpu_id':'cpu'}, inplace=True)
870        devlib_freq.rename(columns={'state':'frequency'}, inplace=True)
871
872        df = self._dfg_trace_event('cpu_frequency')
873        clusters = self.platform['clusters']
874
875        # devlib always introduces fake cpu_frequency events, in case the
876        # OS has not generated cpu_frequency envets there are the only
877        # frequency events to report
878        if len(df) == 0:
879            # Register devlib injected events as 'cpu_frequency' events
880            setattr(self.ftrace.cpu_frequency, 'data_frame', devlib_freq)
881            df = devlib_freq
882            self.available_events.append('cpu_frequency')
883
884        # make sure fake cpu_frequency events are never interleaved with
885        # OS generated events
886        else:
887            if len(devlib_freq) > 0:
888
889                # Frequencies injection is done in a per-cluster based.
890                # This is based on the assumption that clusters are
891                # frequency choerent.
892                # For each cluster we inject devlib events only if
893                # these events does not overlaps with os-generated ones.
894
895                # Inject "initial" devlib frequencies
896                os_df = df
897                dl_df = devlib_freq.iloc[:self.platform['cpus_count']]
898                for _,c in self.platform['clusters'].iteritems():
899                    dl_freqs = dl_df[dl_df.cpu.isin(c)]
900                    os_freqs = os_df[os_df.cpu.isin(c)]
901                    self._log.debug("First freqs for %s:\n%s", c, dl_freqs)
902                    # All devlib events "before" os-generated events
903                    self._log.debug("Min os freq @: %s", os_freqs.index.min())
904                    if os_freqs.empty or \
905                       os_freqs.index.min() > dl_freqs.index.max():
906                        self._log.debug("Insert devlib freqs for %s", c)
907                        df = pd.concat([dl_freqs, df])
908
909                # Inject "final" devlib frequencies
910                os_df = df
911                dl_df = devlib_freq.iloc[self.platform['cpus_count']:]
912                for _,c in self.platform['clusters'].iteritems():
913                    dl_freqs = dl_df[dl_df.cpu.isin(c)]
914                    os_freqs = os_df[os_df.cpu.isin(c)]
915                    self._log.debug("Last freqs for %s:\n%s", c, dl_freqs)
916                    # All devlib events "after" os-generated events
917                    self._log.debug("Max os freq @: %s", os_freqs.index.max())
918                    if os_freqs.empty or \
919                       os_freqs.index.max() < dl_freqs.index.min():
920                        self._log.debug("Append devlib freqs for %s", c)
921                        df = pd.concat([df, dl_freqs])
922
923                df.sort_index(inplace=True)
924
925            setattr(self.ftrace.cpu_frequency, 'data_frame', df)
926
927        # Frequency Coherency Check
928        for _, cpus in clusters.iteritems():
929            cluster_df = df[df.cpu.isin(cpus)]
930            for chunk in self._chunker(cluster_df, len(cpus)):
931                f = chunk.iloc[0].frequency
932                if any(chunk.frequency != f):
933                    self._log.warning('Cluster Frequency is not coherent! '
934                                      'Failure in [cpu_frequency] events at:')
935                    self._log.warning(chunk)
936                    self.freq_coherency = False
937                    return
938        self._log.info('Platform clusters verified to be Frequency coherent')
939
940###############################################################################
941# Utility Methods
942###############################################################################
943
944    def integrate_square_wave(self, sq_wave):
945        """
946        Compute the integral of a square wave time series.
947
948        :param sq_wave: square wave assuming only 1.0 and 0.0 values
949        :type sq_wave: :mod:`pandas.Series`
950        """
951        sq_wave.iloc[-1] = 0.0
952        # Compact signal to obtain only 1-0-1-0 sequences
953        comp_sig = sq_wave.loc[sq_wave.shift() != sq_wave]
954        # First value for computing the difference must be a 1
955        if comp_sig.iloc[0] == 0.0:
956            return sum(comp_sig.iloc[2::2].index - comp_sig.iloc[1:-1:2].index)
957        else:
958            return sum(comp_sig.iloc[1::2].index - comp_sig.iloc[:-1:2].index)
959
960    def _loadFunctionsStats(self, path='trace.stats'):
961        """
962        Read functions profiling file and build a data frame containing all
963        relevant data.
964
965        :param path: path to the functions profiling trace file
966        :type path: str
967        """
968        if os.path.isdir(path):
969            path = os.path.join(path, 'trace.stats')
970        if path.endswith('dat') or path.endswith('html'):
971            pre, ext = os.path.splitext(path)
972            path = pre + '.stats'
973        if not os.path.isfile(path):
974            return False
975
976        # Opening functions profiling JSON data file
977        self._log.debug('Loading functions profiling data from [%s]...', path)
978        with open(os.path.join(path), 'r') as fh:
979            trace_stats = json.load(fh)
980
981        # Build DataFrame of function stats
982        frames = {}
983        for cpu, data in trace_stats.iteritems():
984            frames[int(cpu)] = pd.DataFrame.from_dict(data, orient='index')
985
986        # Build and keep track of the DataFrame
987        self._functions_stats_df = pd.concat(frames.values(),
988                                             keys=frames.keys())
989
990        return len(self._functions_stats_df) > 0
991
992    @memoized
993    def getCPUActiveSignal(self, cpu):
994        """
995        Build a square wave representing the active (i.e. non-idle) CPU time,
996        i.e.:
997
998          cpu_active[t] == 1 if the CPU is reported to be non-idle by cpuidle at
999          time t
1000          cpu_active[t] == 0 otherwise
1001
1002        :param cpu: CPU ID
1003        :type cpu: int
1004
1005        :returns: A :mod:`pandas.Series` or ``None`` if the trace contains no
1006                  "cpu_idle" events
1007        """
1008        if not self.hasEvents('cpu_idle'):
1009            self._log.warning('Events [cpu_idle] not found, '
1010                              'cannot compute CPU active signal!')
1011            return None
1012
1013        idle_df = self._dfg_trace_event('cpu_idle')
1014        cpu_df = idle_df[idle_df.cpu_id == cpu]
1015
1016        cpu_active = cpu_df.state.apply(
1017            lambda s: 1 if s == NON_IDLE_STATE else 0
1018        )
1019
1020        start_time = 0.0
1021        if not self.ftrace.normalized_time:
1022            start_time = self.ftrace.basetime
1023
1024        if cpu_active.empty:
1025            cpu_active = pd.Series([0], index=[start_time])
1026        elif cpu_active.index[0] != start_time:
1027            entry_0 = pd.Series(cpu_active.iloc[0] ^ 1, index=[start_time])
1028            cpu_active = pd.concat([entry_0, cpu_active])
1029
1030        # Fix sequences of wakeup/sleep events reported with the same index
1031        return handle_duplicate_index(cpu_active)
1032
1033
1034    @memoized
1035    def getClusterActiveSignal(self, cluster):
1036        """
1037        Build a square wave representing the active (i.e. non-idle) cluster
1038        time, i.e.:
1039
1040          cluster_active[t] == 1 if at least one CPU is reported to be non-idle
1041          by CPUFreq at time t
1042          cluster_active[t] == 0 otherwise
1043
1044        :param cluster: list of CPU IDs belonging to a cluster
1045        :type cluster: list(int)
1046
1047        :returns: A :mod:`pandas.Series` or ``None`` if the trace contains no
1048                  "cpu_idle" events
1049        """
1050        if not self.hasEvents('cpu_idle'):
1051            self._log.warning('Events [cpu_idle] not found, '
1052                              'cannot compute cluster active signal!')
1053            return None
1054
1055        active = self.getCPUActiveSignal(cluster[0]).to_frame(name=cluster[0])
1056        for cpu in cluster[1:]:
1057            active = active.join(
1058                self.getCPUActiveSignal(cpu).to_frame(name=cpu),
1059                how='outer'
1060            )
1061
1062        active.fillna(method='ffill', inplace=True)
1063
1064        # Cluster active is the OR between the actives on each CPU
1065        # belonging to that specific cluster
1066        cluster_active = reduce(
1067            operator.or_,
1068            [cpu_active.astype(int) for _, cpu_active in
1069             active.iteritems()]
1070        )
1071
1072        return cluster_active
1073
1074
1075class TraceData:
1076    """ A DataFrame collector exposed to Trace's clients """
1077    pass
1078
1079# vim :set tabstop=4 shiftwidth=4 expandtab
1080