1# Copyright 2015-2016 ARM Limited 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# 15 16""" 17:mod:`bart.sched.SchedAssert` provides ability to assert scheduler behaviour. 18The analysis is based on TRAPpy's statistics framework and is potent enough 19to aggregate statistics over processor hierarchies. 20""" 21 22import trappy 23import itertools 24import math 25from trappy.stats.Aggregator import MultiTriggerAggregator 26from bart.sched import functions as sched_funcs 27from bart.common import Utils 28import numpy as np 29 30# pylint: disable=invalid-name 31# pylint: disable=too-many-arguments 32class SchedAssert(object): 33 34 """The primary focus of this class is to assert and verify 35 predefined scheduler scenarios. This does not compare parameters 36 across runs 37 38 :param ftrace: A single trappy.FTrace object 39 or a path that can be passed to trappy.FTrace 40 :type ftrace: :mod:`trappy.ftrace.FTrace` 41 42 :param topology: A topology that describes the arrangement of 43 CPU's on a system. This is useful for multi-cluster systems 44 where data needs to be aggregated at different topological 45 levels 46 :type topology: :mod:`trappy.stats.Topology.Topology` 47 48 :param execname: The execname of the task to be analysed 49 50 .. note:: 51 52 There should be only one PID that maps to the specified 53 execname. If there are multiple PIDs :mod:`bart.sched.SchedMultiAssert` 54 should be used 55 56 :type execname: str 57 58 :param pid: The process ID of the task to be analysed 59 :type pid: int 60 61 .. note: 62 63 One of pid or execname is mandatory. If only execname 64 is specified, The current implementation will fail if 65 there are more than one processes with the same execname 66 """ 67 68 def __init__(self, ftrace, topology, execname=None, pid=None): 69 70 ftrace = Utils.init_ftrace(ftrace) 71 72 if not execname and not pid: 73 raise ValueError("Need to specify at least one of pid or execname") 74 75 self.execname = execname 76 self._ftrace = ftrace 77 self._pid = self._validate_pid(pid) 78 self._aggs = {} 79 self._topology = topology 80 self._triggers = sched_funcs.sched_triggers(self._ftrace, self._pid, 81 trappy.sched.SchedSwitch) 82 self.name = "{}-{}".format(self.execname, self._pid) 83 84 def _validate_pid(self, pid): 85 """Validate the passed pid argument""" 86 87 if not pid: 88 pids = sched_funcs.get_pids_for_process(self._ftrace, 89 self.execname) 90 91 if len(pids) != 1: 92 raise RuntimeError( 93 "There should be exactly one PID {0} for {1}".format( 94 pids, 95 self.execname)) 96 97 return pids[0] 98 99 elif self.execname: 100 101 pids = sched_funcs.get_pids_for_process(self._ftrace, 102 self.execname) 103 if pid not in pids: 104 raise RuntimeError( 105 "PID {0} not mapped to {1}".format( 106 pid, 107 self.execname)) 108 else: 109 self.execname = sched_funcs.get_task_name(self._ftrace, pid) 110 111 return pid 112 113 def _aggregator(self, aggfunc): 114 """ 115 Return an aggregator corresponding to the 116 aggfunc, the aggregators are memoized for performance 117 118 :param aggfunc: Function parameter that 119 accepts a :mod:`pandas.Series` object and 120 returns a vector/scalar 121 122 :type: function(:mod:`pandas.Series`) 123 """ 124 125 if aggfunc not in self._aggs.keys(): 126 self._aggs[aggfunc] = MultiTriggerAggregator(self._triggers, 127 self._topology, 128 aggfunc) 129 return self._aggs[aggfunc] 130 131 def getResidency(self, level, node, window=None, percent=False): 132 """ 133 Residency of the task is the amount of time it spends executing 134 a particular group of a topological level. For example: 135 :: 136 137 from trappy.stats.Topology import Topology 138 139 big = [1, 2] 140 little = [0, 3, 4, 5] 141 142 topology = Topology(clusters=[little, big]) 143 144 s = SchedAssert(trace, topology, pid=123) 145 s.getResidency("cluster", big) 146 147 This will return the residency of the task on the big cluster. If 148 percent is specified it will be normalized to the total runtime 149 of the task 150 151 :param level: The topological level to which the group belongs 152 :type level: str 153 154 :param node: The group of CPUs for which residency 155 needs to calculated 156 :type node: list 157 158 :param window: A (start, end) tuple to limit the scope of the 159 residency calculation. 160 :type window: tuple 161 162 :param percent: If true the result is normalized to the total runtime 163 of the task and returned as a percentage 164 :type percent: bool 165 166 .. math:: 167 168 R = \\frac{T_{group} \\times 100}{T_{total}} 169 170 .. seealso:: :mod:`bart.sched.SchedAssert.SchedAssert.assertResidency` 171 """ 172 173 # Get the index of the node in the level 174 node_index = self._topology.get_index(level, node) 175 176 agg = self._aggregator(sched_funcs.residency_sum) 177 level_result = agg.aggregate(level=level, window=window) 178 179 node_value = level_result[node_index] 180 181 if percent: 182 total = agg.aggregate(level="all", window=window)[0] 183 node_value = node_value * 100 184 node_value = node_value / total 185 186 return node_value 187 188 def assertResidency( 189 self, 190 level, 191 node, 192 expected_value, 193 operator, 194 window=None, 195 percent=False): 196 """ 197 :param level: The topological level to which the group belongs 198 :type level: str 199 200 :param node: The group of CPUs for which residency 201 needs to calculated 202 :type node: list 203 204 :param expected_value: The expected value of the residency 205 :type expected_value: double 206 207 :param operator: A binary operator function that returns 208 a boolean. For example: 209 :: 210 211 import operator 212 op = operator.ge 213 assertResidency(level, node, expected_value, op) 214 215 Will do the following check: 216 :: 217 218 getResidency(level, node) >= expected_value 219 220 A custom function can also be passed: 221 :: 222 223 THRESHOLD=5 224 def between_threshold(a, expected): 225 return abs(a - expected) <= THRESHOLD 226 227 :type operator: function 228 229 :param window: A (start, end) tuple to limit the scope of the 230 residency calculation. 231 :type window: tuple 232 233 :param percent: If true the result is normalized to the total runtime 234 of the task and returned as a percentage 235 :type percent: bool 236 237 .. seealso:: :mod:`bart.sched.SchedAssert.SchedAssert.getResidency` 238 """ 239 node_value = self.getResidency(level, node, window, percent) 240 return operator(node_value, expected_value) 241 242 def getStartTime(self): 243 """ 244 :return: The first time the task ran across all the CPUs 245 """ 246 247 agg = self._aggregator(sched_funcs.first_time) 248 result = agg.aggregate(level="all", value=sched_funcs.TASK_RUNNING) 249 return min(result[0]) 250 251 def getEndTime(self): 252 """ 253 :return: The first last time the task ran across 254 all the CPUs 255 """ 256 257 agg = self._aggregator(sched_funcs.first_time) 258 agg = self._aggregator(sched_funcs.last_time) 259 result = agg.aggregate(level="all", value=sched_funcs.TASK_RUNNING) 260 return max(result[0]) 261 262 def _relax_switch_window(self, series, direction, window): 263 """ 264 direction == "left" 265 return the last time the task was running 266 if no such time exists in the window, 267 extend the window's left extent to 268 getStartTime 269 270 direction == "right" 271 return the first time the task was running 272 in the window. If no such time exists in the 273 window, extend the window's right extent to 274 getEndTime() 275 276 The function returns a None if 277 len(series[series == TASK_RUNNING]) == 0 278 even in the extended window 279 """ 280 281 series = series[series == sched_funcs.TASK_RUNNING] 282 w_series = sched_funcs.select_window(series, window) 283 start, stop = window 284 285 if direction == "left": 286 if len(w_series): 287 return w_series.index.values[-1] 288 else: 289 start_time = self.getStartTime() 290 w_series = sched_funcs.select_window( 291 series, 292 window=( 293 start_time, 294 start)) 295 296 if not len(w_series): 297 return None 298 else: 299 return w_series.index.values[-1] 300 301 elif direction == "right": 302 if len(w_series): 303 return w_series.index.values[0] 304 else: 305 end_time = self.getEndTime() 306 w_series = sched_funcs.select_window(series, window=(stop, end_time)) 307 308 if not len(w_series): 309 return None 310 else: 311 return w_series.index.values[0] 312 else: 313 raise ValueError("direction should be either left or right") 314 315 def assertSwitch( 316 self, 317 level, 318 from_node, 319 to_node, 320 window, 321 ignore_multiple=True): 322 """ 323 This function asserts that there is context switch from the 324 :code:`from_node` to the :code:`to_node`: 325 326 :param level: The topological level to which the group belongs 327 :type level: str 328 329 :param from_node: The node from which the task switches out 330 :type from_node: list 331 332 :param to_node: The node to which the task switches 333 :type to_node: list 334 335 :param window: A (start, end) tuple to limit the scope of the 336 residency calculation. 337 :type window: tuple 338 339 :param ignore_multiple: If true, the function will ignore multiple 340 switches in the window, If false the assert will be true if and 341 only if there is a single switch within the specified window 342 :type ignore_multiple: bool 343 """ 344 345 from_node_index = self._topology.get_index(level, from_node) 346 to_node_index = self._topology.get_index(level, to_node) 347 348 agg = self._aggregator(sched_funcs.csum) 349 level_result = agg.aggregate(level=level) 350 351 from_node_result = level_result[from_node_index] 352 to_node_result = level_result[to_node_index] 353 354 from_time = self._relax_switch_window(from_node_result, "left", window) 355 if ignore_multiple: 356 to_time = self._relax_switch_window(to_node_result, "left", window) 357 else: 358 to_time = self._relax_switch_window( 359 to_node_result, 360 "right", window) 361 362 if from_time and to_time: 363 if from_time < to_time: 364 return True 365 366 return False 367 368 def getRuntime(self, window=None, percent=False): 369 """Return the Total Runtime of a task 370 371 :param window: A (start, end) tuple to limit the scope of the 372 residency calculation. 373 :type window: tuple 374 375 :param percent: If True, the result is returned 376 as a percentage of the total execution time 377 of the run. 378 :type percent: bool 379 380 .. seealso:: :mod:`bart.sched.SchedAssert.SchedAssert.assertRuntime` 381 """ 382 383 agg = self._aggregator(sched_funcs.residency_sum) 384 run_time = agg.aggregate(level="all", window=window)[0] 385 386 if percent: 387 388 if window: 389 begin, end = window 390 total_time = end - begin 391 else: 392 total_time = self._ftrace.get_duration() 393 394 run_time = run_time * 100 395 run_time = run_time / total_time 396 397 return run_time 398 399 def assertRuntime( 400 self, 401 expected_value, 402 operator, 403 window=None, 404 percent=False): 405 """Assert on the total runtime of the task 406 407 :param expected_value: The expected value of the runtime 408 :type expected_value: double 409 410 :param operator: A binary operator function that returns 411 a boolean. For example: 412 :: 413 414 import operator 415 op = operator.ge 416 assertRuntime(expected_value, op) 417 418 Will do the following check: 419 :: 420 421 getRuntime() >= expected_value 422 423 A custom function can also be passed: 424 :: 425 426 THRESHOLD=5 427 def between_threshold(a, expected): 428 return abs(a - expected) <= THRESHOLD 429 430 :type operator: function 431 432 :param window: A (start, end) tuple to limit the scope of the 433 residency calculation. 434 :type window: tuple 435 436 :param percent: If True, the result is returned 437 as a percentage of the total execution time 438 of the run. 439 :type percent: bool 440 441 .. seealso:: :mod:`bart.sched.SchedAssert.SchedAssert.getRuntime` 442 """ 443 444 run_time = self.getRuntime(window, percent) 445 return operator(run_time, expected_value) 446 447 def getPeriod(self, window=None, align="start"): 448 """Return the period of the task in (ms) 449 450 Let's say a task started execution at the following times: 451 452 .. math:: 453 454 T_1, T_2, ...T_n 455 456 The period is defined as: 457 458 .. math:: 459 460 Median((T_2 - T_1), (T_4 - T_3), ....(T_n - T_{n-1})) 461 462 :param window: A (start, end) tuple to limit the scope of the 463 residency calculation. 464 :type window: tuple 465 466 :param align: 467 :code:`"start"` aligns period calculation to switch-in events 468 :code:`"end"` aligns the calculation to switch-out events 469 :type param: str 470 471 .. seealso:: :mod:`bart.sched.SchedAssert.SchedAssert.assertPeriod` 472 """ 473 474 agg = self._aggregator(sched_funcs.period) 475 deltas = agg.aggregate(level="all", window=window)[0] 476 477 if not len(deltas): 478 return float("NaN") 479 else: 480 return np.median(deltas) * 1000 481 482 def assertPeriod( 483 self, 484 expected_value, 485 operator, 486 window=None, 487 align="start"): 488 """Assert on the period of the task 489 490 :param expected_value: The expected value of the runtime 491 :type expected_value: double 492 493 :param operator: A binary operator function that returns 494 a boolean. For example: 495 :: 496 497 import operator 498 op = operator.ge 499 assertPeriod(expected_value, op) 500 501 Will do the following check: 502 :: 503 504 getPeriod() >= expected_value 505 506 A custom function can also be passed: 507 :: 508 509 THRESHOLD=5 510 def between_threshold(a, expected): 511 return abs(a - expected) <= THRESHOLD 512 513 :param window: A (start, end) tuple to limit the scope of the 514 calculation. 515 :type window: tuple 516 517 :param align: 518 :code:`"start"` aligns period calculation to switch-in events 519 :code:`"end"` aligns the calculation to switch-out events 520 :type param: str 521 522 .. seealso:: :mod:`bart.sched.SchedAssert.SchedAssert.getPeriod` 523 """ 524 525 period = self.getPeriod(window, align) 526 return operator(period, expected_value) 527 528 def getDutyCycle(self, window): 529 """Return the duty cycle of the task 530 531 :param window: A (start, end) tuple to limit the scope of the 532 calculation. 533 :type window: tuple 534 535 Duty Cycle: 536 The percentage of time the task spends executing 537 in the given window of time 538 539 .. math:: 540 541 \delta_{cycle} = \\frac{T_{exec} \\times 100}{T_{window}} 542 543 .. seealso:: :mod:`bart.sched.SchedAssert.SchedAssert.assertDutyCycle` 544 """ 545 546 return self.getRuntime(window, percent=True) 547 548 def assertDutyCycle(self, expected_value, operator, window): 549 """ 550 :param operator: A binary operator function that returns 551 a boolean. For example: 552 :: 553 554 import operator 555 op = operator.ge 556 assertPeriod(expected_value, op) 557 558 Will do the following check: 559 :: 560 561 getPeriod() >= expected_value 562 563 A custom function can also be passed: 564 :: 565 566 THRESHOLD=5 567 def between_threshold(a, expected): 568 return abs(a - expected) <= THRESHOLD 569 570 :param window: A (start, end) tuple to limit the scope of the 571 calculation. 572 :type window: tuple 573 574 .. seealso:: :mod:`bart.sched.SchedAssert.SchedAssert.getDutyCycle` 575 576 """ 577 return self.assertRuntime( 578 expected_value, 579 operator, 580 window, 581 percent=True) 582 583 def getFirstCpu(self, window=None): 584 """ 585 :return: The first CPU the task ran on 586 587 .. seealso:: :mod:`bart.sched.SchedAssert.SchedAssert.assertFirstCPU` 588 """ 589 590 agg = self._aggregator(sched_funcs.first_cpu) 591 result = agg.aggregate(level="cpu", window=window) 592 result = list(itertools.chain.from_iterable(result)) 593 594 min_time = min(result) 595 if math.isinf(min_time): 596 return -1 597 index = result.index(min_time) 598 return self._topology.get_node("cpu", index)[0] 599 600 def assertFirstCpu(self, cpus, window=None): 601 """ 602 Check if the Task started (first ran on in the duration 603 of the trace) on a particular CPU(s) 604 605 :param cpus: A list of acceptable CPUs 606 :type cpus: int, list 607 608 .. seealso:: :mod:`bart.sched.SchedAssert.SchedAssert.getFirstCPU` 609 """ 610 611 first_cpu = self.getFirstCpu(window=window) 612 cpus = Utils.listify(cpus) 613 return first_cpu in cpus 614 615 def getLastCpu(self, window=None): 616 """Return the last CPU the task ran on""" 617 618 agg = self._aggregator(sched_funcs.last_cpu) 619 result = agg.aggregate(level="cpu", window=window) 620 result = list(itertools.chain.from_iterable(result)) 621 622 end_time = max(result) 623 if not end_time: 624 return -1 625 626 return result.index(end_time) 627 628 def generate_events(self, level, start_id=0, window=None): 629 """Generate events for the trace plot 630 631 .. note:: 632 This is an internal function accessed by the 633 :mod:`bart.sched.SchedMultiAssert` class for plotting data 634 """ 635 636 agg = self._aggregator(sched_funcs.trace_event) 637 result = agg.aggregate(level=level, window=window) 638 events = [] 639 640 for idx, level_events in enumerate(result): 641 if not len(level_events): 642 continue 643 events += np.column_stack((level_events, np.full(len(level_events), idx))).tolist() 644 645 return sorted(events, key = lambda x : x[0]) 646 647 def plot(self, level="cpu", window=None, xlim=None): 648 """ 649 :return: :mod:`trappy.plotter.AbstractDataPlotter` instance 650 Call :func:`view` to draw the graph 651 """ 652 653 if not xlim: 654 if not window: 655 xlim = [0, self._ftrace.get_duration()] 656 else: 657 xlim = list(window) 658 659 events = {} 660 events[self.name] = self.generate_events(level, window) 661 names = [self.name] 662 num_lanes = self._topology.level_span(level) 663 lane_prefix = level.upper() + ": " 664 return trappy.EventPlot(events, names, xlim, 665 lane_prefix=lane_prefix, 666 num_lanes=num_lanes) 667