• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# SPDX-License-Identifier: Apache-2.0
2#
3# Copyright (C) 2015, ARM Limited and contributors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18""" Trace Parser Module """
19
20import numpy as np
21import os
22import pandas as pd
23import sys
24import trappy
25import json
26import warnings
27import operator
28import logging
29
30from analysis_register import AnalysisRegister
31from collections import namedtuple
32from devlib.utils.misc import memoized
33from trappy.utils import listify, handle_duplicate_index
34
35
36NON_IDLE_STATE = -1
37ResidencyTime = namedtuple('ResidencyTime', ['total', 'active'])
38ResidencyData = namedtuple('ResidencyData', ['label', 'residency'])
39
40class Trace(object):
41    """
42    The Trace object is the LISA trace events parser.
43
44    :param platform: a dictionary containing information about the target
45        platform
46    :type platform: dict or None
47
48    :param data_dir: folder containing all trace data
49    :type data_dir: str
50
51    :param events: events to be parsed (everything in the trace by default)
52    :type events: list(str)
53
54    :param window: time window to consider when parsing the trace
55    :type window: tuple(int, int)
56
57    :param normalize_time: normalize trace time stamps
58    :type normalize_time: bool
59
60    :param trace_format: format of the trace. Possible values are:
61        - FTrace
62        - SysTrace
63    :type trace_format: str
64
65    :param plots_dir: directory where to save plots
66    :type plots_dir: str
67
68    :param plots_prefix: prefix for plots file names
69    :type plots_prefix: str
70
71    :param cgroup_info: add cgroup information for sanitization
72      example:
73        {
74            'controller_ids': { 2: 'schedtune', 4: 'cpuset' },
75            'cgroups': [ 'root', 'background', 'foreground' ],  # list of allowed cgroup names
76        }
77    :type cgroup_info: dict
78    """
79
80    def __init__(self, platform, data_dir, events,
81                 tasks=None,
82                 window=(0, None),
83                 normalize_time=True,
84                 trace_format='FTrace',
85                 plots_dir=None,
86                 plots_prefix='',
87                 cgroup_info={}):
88
89        # The platform used to run the experiments
90        self.platform = platform or {}
91
92        # TRAPpy Trace object
93        self.ftrace = None
94
95        # Trace format
96        self.trace_format = trace_format
97
98        # The time window used to limit trace parsing to
99        self.window = window
100
101        # Whether trace timestamps are normalized or not
102        self.normalize_time = normalize_time
103
104        # Dynamically registered TRAPpy events
105        self.trappy_cls = {}
106
107        # Maximum timespan for all collected events
108        self.time_range = 0
109
110        # Time the system was overutilzied
111        self.overutilized_time = 0
112        self.overutilized_prc = 0
113
114        # List of events required by user
115        self.events = []
116
117        # List of events available in the parsed trace
118        self.available_events = []
119
120        # Cluster frequency coherency flag
121        self.freq_coherency = True
122
123        # Folder containing all trace data
124        self.data_dir = None
125
126        # Setup logging
127        self._log = logging.getLogger('Trace')
128
129        # Folder containing trace
130        if not os.path.isdir(data_dir):
131            self.data_dir = os.path.dirname(data_dir) or '.'
132        else:
133            self.data_dir = data_dir
134
135        # By deafult, use the trace dir to save plots
136        self.plots_dir = plots_dir
137        if self.plots_dir is None:
138            self.plots_dir = self.data_dir
139        self.plots_prefix = plots_prefix
140
141        # Cgroup info for sanitization
142        self.cgroup_info = cgroup_info
143
144        self.__registerTraceEvents(events) if events else None
145        self.__parseTrace(data_dir, tasks, window, trace_format)
146
147        # Minimum and Maximum x_time to use for all plots
148        self.x_min = 0
149        self.x_max = self.time_range
150
151        # Reset x axis time range to full scale
152        t_min = self.window[0]
153        t_max = self.window[1]
154        self.setXTimeRange(t_min, t_max)
155
156        self.data_frame = TraceData()
157        self._registerDataFrameGetters(self)
158
159        # If we don't know the number of CPUs, check the trace for the
160        # highest-numbered CPU that traced an event.
161        if 'cpus_count' not in self.platform:
162            max_cpu = max(int(self.data_frame.trace_event(e)['__cpu'].max())
163                          for e in self.available_events)
164            self.platform['cpus_count'] = max_cpu + 1
165
166        self.analysis = AnalysisRegister(self)
167
168    def _registerDataFrameGetters(self, module):
169        """
170        Internal utility function that looks up getter functions with a "_dfg_"
171        prefix in their name and bounds them to the specified module.
172
173        :param module: module to which the function is added
174        :type module: class
175        """
176        self._log.debug('Registering [%s] local data frames', module)
177        for func in dir(module):
178            if not func.startswith('_dfg_'):
179                continue
180            dfg_name = func.replace('_dfg_', '')
181            dfg_func = getattr(module, func)
182            self._log.debug('   %s', dfg_name)
183            setattr(self.data_frame, dfg_name, dfg_func)
184
185    def setXTimeRange(self, t_min=None, t_max=None):
186        """
187        Set x axis time range to the specified values.
188
189        :param t_min: lower bound
190        :type t_min: int or float
191
192        :param t_max: upper bound
193        :type t_max: int or float
194        """
195        self.x_min = t_min if t_min is not None else self.start_time
196        self.x_max = t_max if t_max is not None else self.start_time + self.time_range
197
198        self._log.debug('Set plots time range to (%.6f, %.6f)[s]',
199                       self.x_min, self.x_max)
200
201    def __registerTraceEvents(self, events):
202        """
203        Save a copy of the parsed events.
204
205        :param events: single event name or list of events names
206        :type events: str or list(str)
207        """
208        if isinstance(events, basestring):
209            self.events = events.split(' ')
210        elif isinstance(events, list):
211            self.events = events
212        else:
213            raise ValueError('Events must be a string or a list of strings')
214        # Register devlib fake cpu_frequency events
215        if 'cpu_frequency' in events:
216            self.events.append('cpu_frequency_devlib')
217
218    def __parseTrace(self, path, tasks, window, trace_format):
219        """
220        Internal method in charge of performing the actual parsing of the
221        trace.
222
223        :param path: path to the trace folder (or trace file)
224        :type path: str
225
226        :param tasks: filter data for the specified tasks only
227        :type tasks: list(str)
228
229        :param window: time window to consider when parsing the trace
230        :type window: tuple(int, int)
231
232        :param trace_format: format of the trace. Possible values are:
233            - FTrace
234            - SysTrace
235        :type trace_format: str
236        """
237        self._log.debug('Loading [sched] events from trace in [%s]...', path)
238        self._log.debug('Parsing events: %s', self.events if self.events else 'ALL')
239        if trace_format.upper() == 'SYSTRACE' or path.endswith('html'):
240            self._log.debug('Parsing SysTrace format...')
241            trace_class = trappy.SysTrace
242            self.trace_format = 'SysTrace'
243        elif trace_format.upper() == 'FTRACE':
244            self._log.debug('Parsing FTrace format...')
245            trace_class = trappy.FTrace
246            self.trace_format = 'FTrace'
247        else:
248            raise ValueError("Unknown trace format {}".format(trace_format))
249
250        # If using normalized time, we should use
251        # TRAPpy's `abs_window` instead of `window`
252        window_kw = {}
253        if self.normalize_time:
254            window_kw['window'] = window
255        else:
256            window_kw['abs_window'] = window
257        scope = 'custom' if self.events else 'all'
258        self.ftrace = trace_class(path, scope=scope, events=self.events,
259                                  normalize_time=self.normalize_time, **window_kw)
260
261        # Load Functions profiling data
262        has_function_stats = self._loadFunctionsStats(path)
263
264        # Check for events available on the parsed trace
265        self.__checkAvailableEvents()
266        if len(self.available_events) == 0:
267            if has_function_stats:
268                self._log.info('Trace contains only functions stats')
269                return
270            raise ValueError('The trace does not contain useful events '
271                             'nor function stats')
272
273        # Index PIDs and Task names
274        self.__loadTasksNames(tasks)
275
276        self.__computeTimeSpan()
277
278        # Sanitize cgroup info if any
279        self._sanitize_CgroupAttachTask()
280
281        # Setup internal data reference to interesting events/dataframes
282        self._sanitize_SchedOverutilized()
283
284        # Santization not possible if platform missing
285        if not self.platform:
286            # Setup internal data reference to interesting events/dataframes
287            self._sanitize_SchedLoadAvgCpu()
288            self._sanitize_SchedLoadAvgTask()
289            self._sanitize_SchedCpuCapacity()
290            self._sanitize_SchedBoostCpu()
291            self._sanitize_SchedBoostTask()
292            self._sanitize_SchedEnergyDiff()
293            self._sanitize_CpuFrequency()
294
295    def __checkAvailableEvents(self, key=""):
296        """
297        Internal method used to build a list of available events.
298
299        :param key: key to be used for TRAPpy filtering
300        :type key: str
301        """
302        for val in self.ftrace.get_filters(key):
303            obj = getattr(self.ftrace, val)
304            if len(obj.data_frame):
305                self.available_events.append(val)
306        self._log.debug('Events found on trace:')
307        for evt in self.available_events:
308            self._log.debug(' - %s', evt)
309
310    def __loadTasksNames(self, tasks):
311        """
312        Try to load tasks names using one of the supported events.
313
314        :param tasks: list of task names. If None, load all tasks found.
315        :type tasks: list(str) or NoneType
316        """
317        def load(tasks, event, name_key, pid_key):
318            df = self._dfg_trace_event(event)
319            if tasks is None:
320                tasks = df[name_key].unique()
321            self._scanTasks(df, name_key=name_key, pid_key=pid_key)
322            self._scanTgids(df)
323
324        if 'sched_switch' in self.available_events:
325            load(tasks, 'sched_switch', 'prev_comm', 'prev_pid')
326            return
327
328        if 'sched_load_avg_task' in self.available_events:
329            load(tasks, 'sched_load_avg_task', 'comm', 'pid')
330            return
331
332        self._log.warning('Failed to load tasks names from trace events')
333
334    def hasEvents(self, dataset):
335        """
336        Returns True if the specified event is present in the parsed trace,
337        False otherwise.
338
339        :param dataset: trace event name or list of trace events
340        :type dataset: str or list(str)
341        """
342        if dataset in self.available_events:
343            return True
344        return False
345
346    def __computeTimeSpan(self):
347        """
348        Compute time axis range, considering all the parsed events.
349        """
350        self.start_time = 0 if self.normalize_time else self.ftrace.basetime
351        self.time_range = self.ftrace.get_duration()
352        self._log.debug('Collected events spans a %.3f [s] time interval',
353                       self.time_range)
354
355        self.setXTimeRange(self.window[0], self.window[1])
356
357    def _scanTgids(self, df):
358        if not '__tgid' in df.columns:
359            return
360        df = df[['__pid', '__tgid']]
361        df = df.drop_duplicates(keep='first').set_index('__pid')
362        df.rename(columns = { '__pid': 'pid', '__tgid': 'tgid' },
363                              inplace=True)
364        self._pid_tgid = df
365
366    def _scanTasks(self, df, name_key='comm', pid_key='pid'):
367        """
368        Extract tasks names and PIDs from the input data frame. The data frame
369        should contain a task name column and PID column.
370
371        :param df: data frame containing trace events from which tasks names
372            and PIDs will be extracted
373        :type df: :mod:`pandas.DataFrame`
374
375        :param name_key: The name of the dataframe columns containing task
376            names
377        :type name_key: str
378
379        :param pid_key: The name of the dataframe columns containing task PIDs
380        :type pid_key: str
381        """
382        df = df[[name_key, pid_key]]
383        self._tasks_by_name = df.set_index(name_key)
384        self._tasks_by_pid = (df.drop_duplicates(subset=pid_key, keep='last')
385                .rename(columns={
386                    pid_key : 'PID',
387                    name_key : 'TaskName'})
388                .set_index('PID').sort_index())
389
390    def getTaskByName(self, name):
391        """
392        Get the PIDs of all tasks with the specified name.
393
394        The same PID can have different task names, mainly because once a task
395        is generated it inherits the parent name and then its name is updated
396        to represent what the task really is.
397
398        This API works under the assumption that a task name is updated at
399        most one time and it always considers the name a task had the last time
400        it has been scheduled for execution in the current trace.
401
402        :param name: task name
403        :type name: str
404
405        :return: a list of PID for tasks which name matches the required one,
406                 the last time they ran in the current trace
407        """
408        return (self._tasks_by_pid[self._tasks_by_pid.TaskName == name]
409                    .index.tolist())
410
411    def getTaskByPid(self, pid):
412        """
413        Get the name of the task with the specified PID.
414
415        The same PID can have different task names, mainly because once a task
416        is generated it inherits the parent name and then its name is
417        updated to represent what the task really is.
418
419        This API works under the assumption that a task name is updated at
420        most one time and it always report the name the task had the last time
421        it has been scheduled for execution in the current trace.
422
423        :param name: task PID
424        :type name: int
425
426        :return: the list of names of the tasks whose PID matches the required one,
427                 the last time they ran in the current trace
428        """
429        try:
430            return self._tasks_by_pid.ix[pid].values[0]
431        except KeyError:
432            return None
433
434    def getTgidFromPid(self, pid):
435        return _pid_tgid.ix[pid].values[0]
436
437    def getTasks(self, dataframe=None,
438                 task_names=None, name_key='comm', pid_key='pid'):
439        """
440        :return: the name of the task which PID matches the required one,
441                 the last time they ran in the current trace
442        """
443        try:
444            return self._tasks_by_pid.ix[pid].values[0]
445        except KeyError:
446            return None
447
448    def getTasks(self):
449        """
450        Get a dictionary of all the tasks in the Trace.
451
452        :return: a dictionary which maps each PID to the corresponding task
453                 name
454        """
455        return self._tasks_by_pid.TaskName.to_dict()
456
457
458###############################################################################
459# DataFrame Getter Methods
460###############################################################################
461
462    def df(self, event):
463        """
464        Get a dataframe containing all occurrences of the specified trace event
465        in the parsed trace.
466
467        :param event: Trace event name
468        :type event: str
469        """
470        warnings.simplefilter('always', DeprecationWarning) #turn off filter
471        warnings.warn("\n\tUse of Trace::df() is deprecated and will be soon removed."
472                      "\n\tUse Trace::data_frame.trace_event(event_name) instead.",
473                      category=DeprecationWarning)
474        warnings.simplefilter('default', DeprecationWarning) #reset filter
475        return self._dfg_trace_event(event)
476
477    def _dfg_trace_event(self, event):
478        """
479        Get a dataframe containing all occurrences of the specified trace event
480        in the parsed trace.
481
482        :param event: Trace event name
483        :type event: str
484        """
485        if self.data_dir is None:
486            raise ValueError("trace data not (yet) loaded")
487        if self.ftrace and hasattr(self.ftrace, event):
488            return getattr(self.ftrace, event).data_frame
489        raise ValueError('Event [{}] not supported. '
490                         'Supported events are: {}'
491                         .format(event, self.available_events))
492
493    def _dfg_functions_stats(self, functions=None):
494        """
495        Get a DataFrame of specified kernel functions profile data
496
497        For each profiled function a DataFrame is returned which reports stats
498        on kernel functions execution time. The reported stats are per-CPU and
499        includes: number of times the function has been executed (hits),
500        average execution time (avg), overall execution time (time) and samples
501        variance (s_2).
502        By default returns a DataFrame of all the functions profiled.
503
504        :param functions: the name of the function or a list of function names
505                          to report
506        :type functions: str or list(str)
507        """
508        if not hasattr(self, '_functions_stats_df'):
509            return None
510        df = self._functions_stats_df
511        if not functions:
512            return df
513        return df.loc[df.index.get_level_values(1).isin(listify(functions))]
514
515    # cgroup_attach_task with just merged fake and real events
516    def _cgroup_attach_task(self):
517        cgroup_events = ['cgroup_attach_task', 'cgroup_attach_task_devlib']
518        df = None
519
520        if set(cgroup_events).isdisjoint(set(self.available_events)):
521            self._log.error('atleast one of {} is needed for cgroup_attach_task event generation'.format(cgroup_events))
522            return None
523
524        for cev in cgroup_events:
525            if not cev in self.available_events:
526                continue
527            cdf = self._dfg_trace_event(cev)
528            cdf = cdf[['__line', 'pid', 'controller', 'cgroup']]
529            if not isinstance(df, pd.DataFrame):
530                df = cdf
531            else:
532                df = pd.concat([cdf, df])
533
534        # Always drop na since this DF is used as secondary
535        df.dropna(inplace=True, how='any')
536        return df
537
538    @memoized
539    def _dfg_cgroup_attach_task(self, controllers = ['schedtune', 'cpuset']):
540        # Since fork doesn't result in attach events, generate fake attach events
541        # The below mechanism doesn't work to propogate nested fork levels:
542        # For ex:
543        # cgroup_attach_task: pid=1166
544        # fork: pid=1166 child_pid=2222  <-- fake attach generated
545        # fork: pid=2222 child_pid=3333  <-- fake attach not generated
546        def fork_add_cgroup(fdf, cdf, controller):
547            cdf = cdf[cdf['controller'] == controller]
548            ret_df = trappy.utils.merge_dfs(fdf, cdf, pivot='pid')
549            return ret_df
550
551        if not 'sched_process_fork' in self.available_events:
552            self._log.error('sched_process_fork is mandatory to get proper cgroup_attach events')
553            return None
554        fdf = self._dfg_trace_event('sched_process_fork')
555
556        forks_len = len(fdf)
557        forkdf = fdf
558        cdf = self._cgroup_attach_task()
559        for idx, c in enumerate(controllers):
560            fdf = fork_add_cgroup(fdf, cdf, c)
561            if (idx != (len(controllers) - 1)):
562                fdf = pd.concat([fdf, forkdf]).sort_values(by='__line')
563
564        fdf = fdf[['__line', 'child_pid', 'controller', 'cgroup']]
565        fdf.rename(columns = { 'child_pid': 'pid' }, inplace=True)
566
567        # Always drop na since this DF is used as secondary
568        fdf.dropna(inplace=True, how='any')
569
570        new_forks_len = len(fdf) / len(controllers)
571
572        fdf = pd.concat([fdf, cdf]).sort_values(by='__line')
573
574        if new_forks_len < forks_len:
575            dropped = forks_len - new_forks_len
576            self._log.info("Couldn't attach all forks cgroup with-attach events ({} dropped)".format(dropped))
577        return fdf
578
579    @memoized
580    def _dfg_sched_switch_cgroup(self, controllers = ['schedtune', 'cpuset']):
581        def sched_switch_add_cgroup(sdf, cdf, controller, direction):
582            cdf = cdf[cdf['controller'] == controller]
583
584            ret_df = sdf.rename(columns = { direction + '_pid': 'pid' })
585            ret_df = trappy.utils.merge_dfs(ret_df, cdf, pivot='pid')
586            ret_df.rename(columns = { 'pid': direction + '_pid' }, inplace=True)
587
588            ret_df.drop('controller', axis=1, inplace=True)
589            ret_df.rename(columns = { 'cgroup': direction + '_' + controller }, inplace=True)
590            return ret_df
591
592        if not 'sched_switch' in self.available_events:
593            self._log.error('sched_switch is mandatory to generate sched_switch_cgroup event')
594            return None
595        sdf = self._dfg_trace_event('sched_switch')
596        cdf = self._dfg_cgroup_attach_task()
597
598        for c in controllers:
599            sdf = sched_switch_add_cgroup(sdf, cdf, c, 'next')
600            sdf = sched_switch_add_cgroup(sdf, cdf, c, 'prev')
601
602        # Augment with TGID information
603        sdf = sdf.join(self._pid_tgid, on='next_pid').rename(columns = {'tgid': 'next_tgid'})
604        sdf = sdf.join(self._pid_tgid, on='prev_pid').rename(columns = {'tgid': 'prev_tgid'})
605
606        df = self._tasks_by_pid.rename(columns = { 'next_comm': 'comm' })
607
608        sdf = sdf.join(df, on='next_tgid').rename(columns = {'TaskName': 'next_tgid_comm'})
609        sdf = sdf.join(df, on='prev_tgid').rename(columns = {'TaskName': 'prev_tgid_comm'})
610        return sdf
611
612###############################################################################
613# Trace Events Sanitize Methods
614###############################################################################
615    @property
616    def has_big_little(self):
617        return ('clusters' in self.platform
618                and 'big' in self.platform['clusters']
619                and 'little' in self.platform['clusters']
620                and 'nrg_model' in self.platform)
621
622    def _sanitize_SchedCpuCapacity(self):
623        """
624        Add more columns to cpu_capacity data frame if the energy model is
625        available and the platform is big.LITTLE.
626        """
627        if not self.hasEvents('cpu_capacity') \
628           or 'nrg_model' not in self.platform \
629           or not self.has_big_little:
630            return
631
632        df = self._dfg_trace_event('cpu_capacity')
633
634        # Add column with LITTLE and big CPUs max capacities
635        nrg_model = self.platform['nrg_model']
636        max_lcap = nrg_model['little']['cpu']['cap_max']
637        max_bcap = nrg_model['big']['cpu']['cap_max']
638        df['max_capacity'] = np.select(
639                [df.cpu.isin(self.platform['clusters']['little'])],
640                [max_lcap], max_bcap)
641        # Add LITTLE and big CPUs "tipping point" threshold
642        tip_lcap = 0.8 * max_lcap
643        tip_bcap = 0.8 * max_bcap
644        df['tip_capacity'] = np.select(
645                [df.cpu.isin(self.platform['clusters']['little'])],
646                [tip_lcap], tip_bcap)
647
648    def _sanitize_SchedLoadAvgCpu(self):
649        """
650        If necessary, rename certain signal names from v5.0 to v5.1 format.
651        """
652        if not self.hasEvents('sched_load_avg_cpu'):
653            return
654        df = self._dfg_trace_event('sched_load_avg_cpu')
655        if 'utilization' in df:
656            df.rename(columns={'utilization': 'util_avg'}, inplace=True)
657            df.rename(columns={'load': 'load_avg'}, inplace=True)
658
659    def _sanitize_SchedLoadAvgTask(self):
660        """
661        If necessary, rename certain signal names from v5.0 to v5.1 format.
662        """
663        if not self.hasEvents('sched_load_avg_task'):
664            return
665        df = self._dfg_trace_event('sched_load_avg_task')
666        if 'utilization' in df:
667            df.rename(columns={'utilization': 'util_avg'}, inplace=True)
668            df.rename(columns={'load': 'load_avg'}, inplace=True)
669            df.rename(columns={'avg_period': 'period_contrib'}, inplace=True)
670            df.rename(columns={'runnable_avg_sum': 'load_sum'}, inplace=True)
671            df.rename(columns={'running_avg_sum': 'util_sum'}, inplace=True)
672
673        if not self.has_big_little:
674            return
675
676        df['cluster'] = np.select(
677                [df.cpu.isin(self.platform['clusters']['little'])],
678                ['LITTLE'], 'big')
679
680        if 'nrg_model' not in self.platform:
681            return
682
683        # Add a column which represents the max capacity of the smallest
684        # clustre which can accomodate the task utilization
685        little_cap = self.platform['nrg_model']['little']['cpu']['cap_max']
686        big_cap = self.platform['nrg_model']['big']['cpu']['cap_max']
687        df['min_cluster_cap'] = df.util_avg.map(
688            lambda util_avg: big_cap if util_avg > little_cap else little_cap
689        )
690
691    def _sanitize_SchedBoostCpu(self):
692        """
693        Add a boosted utilization signal as the sum of utilization and margin.
694
695        Also, if necessary, rename certain signal names from v5.0 to v5.1
696        format.
697        """
698        if not self.hasEvents('sched_boost_cpu'):
699            return
700        df = self._dfg_trace_event('sched_boost_cpu')
701        if 'usage' in df:
702            df.rename(columns={'usage': 'util'}, inplace=True)
703        df['boosted_util'] = df['util'] + df['margin']
704
705    def _sanitize_SchedBoostTask(self):
706        """
707        Add a boosted utilization signal as the sum of utilization and margin.
708
709        Also, if necessary, rename certain signal names from v5.0 to v5.1
710        format.
711        """
712        if not self.hasEvents('sched_boost_task'):
713            return
714        df = self._dfg_trace_event('sched_boost_task')
715        if 'utilization' in df:
716            # Convert signals name from to v5.1 format
717            df.rename(columns={'utilization': 'util'}, inplace=True)
718        df['boosted_util'] = df['util'] + df['margin']
719
720    def _sanitize_SchedEnergyDiff(self):
721        """
722        If a energy model is provided, some signals are added to the
723        sched_energy_diff trace event data frame.
724
725        Also convert between existing field name formats for sched_energy_diff
726        """
727        if not self.hasEvents('sched_energy_diff') \
728           or 'nrg_model' not in self.platform \
729           or not self.has_big_little:
730            return
731        nrg_model = self.platform['nrg_model']
732        em_lcluster = nrg_model['little']['cluster']
733        em_bcluster = nrg_model['big']['cluster']
734        em_lcpu = nrg_model['little']['cpu']
735        em_bcpu = nrg_model['big']['cpu']
736        lcpus = len(self.platform['clusters']['little'])
737        bcpus = len(self.platform['clusters']['big'])
738        SCHED_LOAD_SCALE = 1024
739
740        power_max = em_lcpu['nrg_max'] * lcpus + em_bcpu['nrg_max'] * bcpus + \
741            em_lcluster['nrg_max'] + em_bcluster['nrg_max']
742        self._log.debug(
743            "Maximum estimated system energy: {0:d}".format(power_max))
744
745        df = self._dfg_trace_event('sched_energy_diff')
746
747        translations = {'nrg_d' : 'nrg_diff',
748                        'utl_d' : 'usage_delta',
749                        'payoff' : 'nrg_payoff'
750        }
751        df.rename(columns=translations, inplace=True)
752
753        df['nrg_diff_pct'] = SCHED_LOAD_SCALE * df.nrg_diff / power_max
754
755        # Tag columns by usage_delta
756        ccol = df.usage_delta
757        df['usage_delta_group'] = np.select(
758            [ccol < 150, ccol < 400, ccol < 600],
759            ['< 150', '< 400', '< 600'], '>= 600')
760
761        # Tag columns by nrg_payoff
762        ccol = df.nrg_payoff
763        df['nrg_payoff_group'] = np.select(
764            [ccol > 2e9, ccol > 0, ccol > -2e9],
765            ['Optimal Accept', 'SchedTune Accept', 'SchedTune Reject'],
766            'Suboptimal Reject')
767
768    def _sanitize_SchedOverutilized(self):
769        """ Add a column with overutilized status duration. """
770        if not self.hasEvents('sched_overutilized'):
771            return
772        df = self._dfg_trace_event('sched_overutilized')
773        df['start'] = df.index
774        df['len'] = (df.start - df.start.shift()).fillna(0).shift(-1)
775        df.drop('start', axis=1, inplace=True)
776
777        # Fix the last event, which will have a NaN duration
778        # Set duration to trace_end - last_event
779        df.loc[df.index[-1], 'len'] = self.start_time + self.time_range - df.index[-1]
780
781        # Build a stat on trace overutilization
782        df = self._dfg_trace_event('sched_overutilized')
783        self.overutilized_time = df[df.overutilized == 1].len.sum()
784        self.overutilized_prc = 100. * self.overutilized_time / self.time_range
785
786        self._log.debug('Overutilized time: %.6f [s] (%.3f%% of trace time)',
787                        self.overutilized_time, self.overutilized_prc)
788
789    # Sanitize cgroup information helper
790    def _helper_sanitize_CgroupAttachTask(self, df, allowed_cgroups, controller_id_name):
791        # Drop rows that aren't in the root-id -> name map
792        df = df[df['dst_root'].isin(controller_id_name.keys())]
793
794        def get_cgroup_name(path, valid_names):
795            name = os.path.basename(path)
796            name = 'root' if not name in valid_names else name
797            return name
798
799        def get_cgroup_names(rows):
800            ret = []
801            for r in rows.iterrows():
802                 ret.append(get_cgroup_name(r[1]['dst_path'], allowed_cgroups))
803            return ret
804
805        def get_controller_names(rows):
806            ret = []
807            for r in rows.iterrows():
808                 ret.append(controller_id_name[r[1]['dst_root']])
809            return ret
810
811        # Sanitize cgroup names
812        # cgroup column isn't in mainline, add it in
813        # its already added for some out of tree kernels so check first
814        if not 'cgroup' in df.columns:
815            if not 'dst_path' in df.columns:
816                raise RuntimeError('Cant santize cgroup DF, need dst_path')
817            df = df.assign(cgroup = get_cgroup_names)
818
819        # Sanitize controller names
820        if not 'controller' in df.columns:
821            if not 'dst_root' in df.columns:
822                raise RuntimeError('Cant santize cgroup DF, need dst_path')
823            df = df.assign(controller = get_controller_names)
824
825        return df
826
827    def _sanitize_CgroupAttachTask(self):
828        def sanitize_cgroup_event(name):
829            if not name in self.available_events:
830                return
831
832            df = self._dfg_trace_event(name)
833
834            if len(df.groupby(level=0).filter(lambda x: len(x) > 1)) > 0:
835                self._log.warning('Timstamp Collisions seen in {} event!'.format(name))
836
837            df = self._helper_sanitize_CgroupAttachTask(df, self.cgroup_info['cgroups'],
838                                              self.cgroup_info['controller_ids'])
839            getattr(self.ftrace, name).data_frame = df
840        sanitize_cgroup_event('cgroup_attach_task')
841        sanitize_cgroup_event('cgroup_attach_task_devlib')
842
843    def _chunker(self, seq, size):
844        """
845        Given a data frame or a series, generate a sequence of chunks of the
846        given size.
847
848        :param seq: data to be split into chunks
849        :type seq: :mod:`pandas.Series` or :mod:`pandas.DataFrame`
850
851        :param size: size of each chunk
852        :type size: int
853        """
854        return (seq.iloc[pos:pos + size] for pos in range(0, len(seq), size))
855
856    def _sanitize_CpuFrequency(self):
857        """
858        Verify that all platform reported clusters are frequency coherent (i.e.
859        frequency scaling is performed at a cluster level).
860        """
861        if not self.hasEvents('cpu_frequency_devlib') \
862           or 'clusters' not in self.platform:
863            return
864
865        devlib_freq = self._dfg_trace_event('cpu_frequency_devlib')
866        devlib_freq.rename(columns={'cpu_id':'cpu'}, inplace=True)
867        devlib_freq.rename(columns={'state':'frequency'}, inplace=True)
868
869        df = self._dfg_trace_event('cpu_frequency')
870        clusters = self.platform['clusters']
871
872        # devlib always introduces fake cpu_frequency events, in case the
873        # OS has not generated cpu_frequency envets there are the only
874        # frequency events to report
875        if len(df) == 0:
876            # Register devlib injected events as 'cpu_frequency' events
877            setattr(self.ftrace.cpu_frequency, 'data_frame', devlib_freq)
878            df = devlib_freq
879            self.available_events.append('cpu_frequency')
880
881        # make sure fake cpu_frequency events are never interleaved with
882        # OS generated events
883        else:
884            if len(devlib_freq) > 0:
885
886                # Frequencies injection is done in a per-cluster based.
887                # This is based on the assumption that clusters are
888                # frequency choerent.
889                # For each cluster we inject devlib events only if
890                # these events does not overlaps with os-generated ones.
891
892                # Inject "initial" devlib frequencies
893                os_df = df
894                dl_df = devlib_freq.iloc[:self.platform['cpus_count']]
895                for _,c in self.platform['clusters'].iteritems():
896                    dl_freqs = dl_df[dl_df.cpu.isin(c)]
897                    os_freqs = os_df[os_df.cpu.isin(c)]
898                    self._log.debug("First freqs for %s:\n%s", c, dl_freqs)
899                    # All devlib events "before" os-generated events
900                    self._log.debug("Min os freq @: %s", os_freqs.index.min())
901                    if os_freqs.empty or \
902                       os_freqs.index.min() > dl_freqs.index.max():
903                        self._log.debug("Insert devlib freqs for %s", c)
904                        df = pd.concat([dl_freqs, df])
905
906                # Inject "final" devlib frequencies
907                os_df = df
908                dl_df = devlib_freq.iloc[self.platform['cpus_count']:]
909                for _,c in self.platform['clusters'].iteritems():
910                    dl_freqs = dl_df[dl_df.cpu.isin(c)]
911                    os_freqs = os_df[os_df.cpu.isin(c)]
912                    self._log.debug("Last freqs for %s:\n%s", c, dl_freqs)
913                    # All devlib events "after" os-generated events
914                    self._log.debug("Max os freq @: %s", os_freqs.index.max())
915                    if os_freqs.empty or \
916                       os_freqs.index.max() < dl_freqs.index.min():
917                        self._log.debug("Append devlib freqs for %s", c)
918                        df = pd.concat([df, dl_freqs])
919
920                df.sort_index(inplace=True)
921
922            setattr(self.ftrace.cpu_frequency, 'data_frame', df)
923
924        # Frequency Coherency Check
925        for _, cpus in clusters.iteritems():
926            cluster_df = df[df.cpu.isin(cpus)]
927            for chunk in self._chunker(cluster_df, len(cpus)):
928                f = chunk.iloc[0].frequency
929                if any(chunk.frequency != f):
930                    self._log.warning('Cluster Frequency is not coherent! '
931                                      'Failure in [cpu_frequency] events at:')
932                    self._log.warning(chunk)
933                    self.freq_coherency = False
934                    return
935        self._log.info('Platform clusters verified to be Frequency coherent')
936
937###############################################################################
938# Utility Methods
939###############################################################################
940
941    def integrate_square_wave(self, sq_wave):
942        """
943        Compute the integral of a square wave time series.
944
945        :param sq_wave: square wave assuming only 1.0 and 0.0 values
946        :type sq_wave: :mod:`pandas.Series`
947        """
948        sq_wave.iloc[-1] = 0.0
949        # Compact signal to obtain only 1-0-1-0 sequences
950        comp_sig = sq_wave.loc[sq_wave.shift() != sq_wave]
951        # First value for computing the difference must be a 1
952        if comp_sig.iloc[0] == 0.0:
953            return sum(comp_sig.iloc[2::2].index - comp_sig.iloc[1:-1:2].index)
954        else:
955            return sum(comp_sig.iloc[1::2].index - comp_sig.iloc[:-1:2].index)
956
957    def _loadFunctionsStats(self, path='trace.stats'):
958        """
959        Read functions profiling file and build a data frame containing all
960        relevant data.
961
962        :param path: path to the functions profiling trace file
963        :type path: str
964        """
965        if os.path.isdir(path):
966            path = os.path.join(path, 'trace.stats')
967        if (path.endswith('dat') or
968            path.endswith('txt') or
969            path.endswith('html')):
970            pre, ext = os.path.splitext(path)
971            path = pre + '.stats'
972        if not os.path.isfile(path):
973            return False
974
975        # Opening functions profiling JSON data file
976        self._log.debug('Loading functions profiling data from [%s]...', path)
977        with open(os.path.join(path), 'r') as fh:
978            trace_stats = json.load(fh)
979
980        # Build DataFrame of function stats
981        frames = {}
982        for cpu, data in trace_stats.iteritems():
983            frames[int(cpu)] = pd.DataFrame.from_dict(data, orient='index')
984
985        # Build and keep track of the DataFrame
986        self._functions_stats_df = pd.concat(frames.values(),
987                                             keys=frames.keys())
988
989        return len(self._functions_stats_df) > 0
990
991    @memoized
992    def getCPUActiveSignal(self, cpu):
993        """
994        Build a square wave representing the active (i.e. non-idle) CPU time,
995        i.e.:
996
997          cpu_active[t] == 1 if the CPU is reported to be non-idle by cpuidle at
998          time t
999          cpu_active[t] == 0 otherwise
1000
1001        :param cpu: CPU ID
1002        :type cpu: int
1003
1004        :returns: A :mod:`pandas.Series` or ``None`` if the trace contains no
1005                  "cpu_idle" events
1006        """
1007        if not self.hasEvents('cpu_idle'):
1008            self._log.warning('Events [cpu_idle] not found, '
1009                              'cannot compute CPU active signal!')
1010            return None
1011
1012        idle_df = self._dfg_trace_event('cpu_idle')
1013        cpu_df = idle_df[idle_df.cpu_id == cpu]
1014
1015        cpu_active = cpu_df.state.apply(
1016            lambda s: 1 if s == NON_IDLE_STATE else 0
1017        )
1018
1019        start_time = 0.0
1020        if not self.ftrace.normalized_time:
1021            start_time = self.ftrace.basetime
1022
1023        if cpu_active.empty:
1024            cpu_active = pd.Series([0], index=[start_time])
1025        elif cpu_active.index[0] != start_time:
1026            entry_0 = pd.Series(cpu_active.iloc[0] ^ 1, index=[start_time])
1027            cpu_active = pd.concat([entry_0, cpu_active])
1028
1029        # Fix sequences of wakeup/sleep events reported with the same index
1030        return handle_duplicate_index(cpu_active)
1031
1032
1033    @memoized
1034    def getClusterActiveSignal(self, cluster):
1035        """
1036        Build a square wave representing the active (i.e. non-idle) cluster
1037        time, i.e.:
1038
1039          cluster_active[t] == 1 if at least one CPU is reported to be non-idle
1040          by CPUFreq at time t
1041          cluster_active[t] == 0 otherwise
1042
1043        :param cluster: list of CPU IDs belonging to a cluster
1044        :type cluster: list(int)
1045
1046        :returns: A :mod:`pandas.Series` or ``None`` if the trace contains no
1047                  "cpu_idle" events
1048        """
1049        if not self.hasEvents('cpu_idle'):
1050            self._log.warning('Events [cpu_idle] not found, '
1051                              'cannot compute cluster active signal!')
1052            return None
1053
1054        active = self.getCPUActiveSignal(cluster[0]).to_frame(name=cluster[0])
1055        for cpu in cluster[1:]:
1056            active = active.join(
1057                self.getCPUActiveSignal(cpu).to_frame(name=cpu),
1058                how='outer'
1059            )
1060
1061        active.fillna(method='ffill', inplace=True)
1062        # There might be NaNs in the signal where we got data from some CPUs
1063        # before others. That will break the .astype(int) below, so drop rows
1064        # with NaN in them.
1065        active.dropna(inplace=True)
1066
1067        # Cluster active is the OR between the actives on each CPU
1068        # belonging to that specific cluster
1069        cluster_active = reduce(
1070            operator.or_,
1071            [cpu_active.astype(int) for _, cpu_active in
1072             active.iteritems()]
1073        )
1074
1075        return cluster_active
1076
1077
1078class TraceData:
1079    """ A DataFrame collector exposed to Trace's clients """
1080    pass
1081
1082# vim :set tabstop=4 shiftwidth=4 expandtab
1083