# SPDX-License-Identifier: Apache-2.0 # # Copyright (C) 2016, ARM Limited and contributors. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import json import time import re import pandas import StringIO from unittest import SkipTest from env import TestEnv from test import LisaTest """ Goal ==== Check that the configuration of a given device is suitable for running EAS. Detailed Description ==================== This test reads the kernel configuration and digs around in sysfs to check the following attributes are true: * the minimum set of required config options are enabled * all CPUs have access to the 'sched' CPUFreq governor * energy aware scheduling is present and enabled Expected Behaviour ================== All required config options are set, sched governor is present. """ TEST_CONF = { 'modules': ['cpufreq'], 'results_dir': 'PreliminaryTests', 'tools': [ 'sysbench', ] } class BasicCheckTest(LisaTest): @classmethod def setUpClass(cls): cls.env = TestEnv(test_conf=TEST_CONF) cls.target = cls.env.target class TestSchedGovernor(BasicCheckTest): def test_sched_governor_available(self): """ Check that the 'sched' or 'schedutil' cpufreq governor is available """ fail_list = [] for cpu in self.target.list_online_cpus(): governors = self.target.cpufreq.list_governors(cpu) if 'sched' not in governors and 'schedutil' not in governors: fail_list.append(cpu) msg = 'CPUs {} do not support sched[util] cpufreq governor'.format( fail_list) self.assertTrue(len(fail_list) == 0, msg=msg) class TestKernelConfig(BasicCheckTest): def test_kernel_config(self): """ Check that the kernel config has the basic requirements for EAS """ kernel_config = self.target.config if not kernel_config.text: raise SkipTest('Kernel config not available on target') # NB: We don't test for schedtune/schedutil, that's tested by # TestSchedGovernor. necessary_configs = [ # 'CONFIG_CPU_FREQ_STAT', 'CONFIG_CGROUPS', 'CONFIG_SMP', 'CONFIG_SCHED_MC', 'CONFIG_CPU_FREQ', 'CONFIG_CPU_IDLE', 'CONFIG_SCHED_DEBUG', ] fail_list = [c for c in necessary_configs if not kernel_config.is_enabled(c)] if len(fail_list): message = 'Missing kernel configs: ' + ', '.join(fail_list) self.assertTrue(len(fail_list) == 0, msg=message) class TestWorkThroughput(BasicCheckTest): """ Check that compute throughput increases with CPU frequency That is, check that cpufreq really works in that setting a higher frequency provides greater CPU performance """ def _run_sysbench_work(self, cpu, duration): """ Run benchmark using 1 thread on a given CPU. :param cpu: cpu to run the benchmark on :type cpu: str :param duration: length of time, in seconds to run the benchmark :returns: float - performance score """ args = '--test=cpu --num-threads=1 --max-time={} run'.format(duration) sysbench = self.target.path.join(self.target.executables_directory, 'sysbench') bench_out = self.target.invoke(sysbench, args=args, on_cpus=[cpu]) match = re.search(r'(total number of events:\s*)([\d.]*)', bench_out) return float(match.group(2)) def _check_work_throughput(self, cpu, duration, margin): frequencies = self.target.cpufreq.list_frequencies(cpu) if len(frequencies) == 1: return True original_governor = self.target.cpufreq.get_governor(cpu) original_freq = None if original_governor == 'userspace': original_freq = self.target.cpufreq.get_frequency(cpu) # Set userspace governor self.target.cpufreq.set_governor(cpu, 'userspace') # Run at lowest & highest freq result = {} for freq in [frequencies[0], frequencies[-1]]: self.target.cpufreq.set_frequency(cpu, freq) result[freq] = self._run_sysbench_work(cpu, duration) # Restore governor self.target.cpufreq.set_governor(cpu, original_governor) if original_freq: self.target.cpufreq.set_frequency(cpu, original_freq) # Make sure work done at highest OPP is at least some % higher # than work done at lowest OPP - this filters the # +/- 1 sysbench result noise work_diff = result[frequencies[-1]] - result[frequencies[0]] ok = work_diff > result[frequencies[0]] * margin return ok def test_work_throughput(self): duration = 1.0 margin = 0.1 failed_cpus = [] # Run test on each known cpu for cpu in range(self.target.number_of_cpus): if not self._check_work_throughput(cpu, duration, margin): failed_cpus.append(cpu) # Format error message msg='Problems detected on CPUs: {}\n'\ 'Work at highest OPP wasn\'t {}% bigger than work at lowest OPP on these CPUs'\ .format(failed_cpus, margin * 100) self.assertFalse(len(failed_cpus), msg=msg) class TestEnergyModelPresent(BasicCheckTest): def test_energy_model_present(self): """Test that we can see the energy model in sysctl""" if not self.target.file_exists( '/proc/sys/kernel/sched_domain/cpu0/domain0/group0/energy/'): raise AssertionError( 'No energy model visible in procfs. Possible causes: \n' '- Kernel built without (CONFIG_SCHED_DEBUG && CONFIG_SYSCTL)\n' '- No energy model in kernel') class TestSchedutilTunables(BasicCheckTest): MAX_RATE_LIMIT_US = 20 * 1e3 def test_rate_limit_not_too_high(self): """Test that the schedutil ratelimiting is not too harsh""" governors = self.target.cpufreq.list_governors(0) if 'schedutil' not in governors: raise SkipTest('schedutil not present on target') self.target.cpufreq.set_all_governors('schedutil') cpus = set(range(self.target.number_of_cpus)) fail_cpus = [] while cpus: cpu = iter(cpus).next() domain = tuple(self.target.cpufreq.get_related_cpus(cpu)) tunables = self.target.cpufreq.get_governor_tunables(cpu) for name, value in tunables.iteritems(): if name.endswith('rate_limit_us'): if int(value) > self.MAX_RATE_LIMIT_US: fail_cpus += domain cpus = cpus.difference(domain) self.assertTrue( fail_cpus == [], 'schedutil rate limit greater than {}us on CPUs {}. ' 'Responsiveness will be affected.'.format( self.MAX_RATE_LIMIT_US, fail_cpus)) class TestSchedDomainFlags(BasicCheckTest): """Test requirements of sched_domain flags""" # See include/linux/sched.h in an EAS kernel SD_ASYM_CPUCAPACITY = 0x0040 SD_SHARE_CAP_STATES = 0x8000 def setUp(self): if not self.target.file_exists('/proc/sys/kernel/sched_domain/'): raise SkipTest('sched_domain info not exposed in procfs. ' 'Enable CONFIG_SCHED_DEBUG in target kernel') def iter_cpu_sd_flags(self, cpu): """ Get the flags for a given CPU's sched_domains :param cpu: Logical CPU number whose sched_domains' flags we want :returns: Iterator over the flags, as an int, of each of that CPU's domains, highest-level (i.e. typically "DIE") first. """ base_path = '/proc/sys/kernel/sched_domain/cpu{}/'.format(cpu) for domain in sorted(self.target.list_directory(base_path), reverse=True): flags_path = self.target.path.join(base_path, domain, 'flags') yield self.target.read_int(flags_path) def test_share_cap_states(self): """ Check that some domain exists with SD_SHARE_CAP_STATES set EAS silently does nothing if this flag is not set at any level (see use of sd_scs percpu variable in scheduler code). """ cpu0_flags = [] for flags in self.iter_cpu_sd_flags(0): if flags & self.SD_SHARE_CAP_STATES: return cpu0_flags.append(flags) flags_str = ', '.join([hex(f) for f in cpu0_flags]) raise AssertionError('No sched_domain with SD_SHARE_CAP_STATES flag. ' 'flags: {}'.format(flags_str)) def _get_cpu_cap_path(self, cpu): return '/sys/devices/system/cpu/cpu{}/cpu_capacity'.format(cpu) def read_cpu_caps(self): """Get all the CPUs' capacities from sysfs as a list of ints""" return [self.target.read_int(self._get_cpu_cap_path(cpu)) for cpu in range(self.target.number_of_cpus)] def write_cpu_caps(self, caps): """Write all the CPUs' capacites to sysfs from a list of ints""" for cpu, cap in enumerate(caps): self.target.write_value(self._get_cpu_cap_path(cpu), cap) def _test_asym_cpucapacity(self, caps, expect_asym): top_sd_flags = self.iter_cpu_sd_flags(0).next() if expect_asym: self.assertTrue( top_sd_flags & self.SD_ASYM_CPUCAPACITY, 'SD_ASYM_CPUCAPACITY not set on highest sched_domain. ' 'cpu_capacity values: {}'.format(caps)) else: self.assertFalse( top_sd_flags & self.SD_ASYM_CPUCAPACITY, 'SD_ASYM_CPUCAPACITY set unexpectedly on highest sched_domain. ' 'cpu_capacity values: {}'.format(caps)) def test_asym_cpucapacity(self): """ Check that the SD_ASYM_CPUCAPACITY flag gets set when it should SD_ASYM_CPUCAPACITY should be set at least on the highest domain when a system is asymmetric. - Test that it is set appropriately for the current cpu_capacity values - Invert the apparent symmetry of the system by modifying the cpu_capacity sysfs files, and check the flag is inverted. - Finally, revert to the old cpu_capacity values and check the flag returns to its old value. """ if not self.target.file_exists(self._get_cpu_cap_path(0)): raise SkipTest('cpu_capacity info not exposed in sysfs.') old_caps = self.read_cpu_caps() old_caps_asym = any(c != old_caps[0] for c in old_caps[1:]) self._test_asym_cpucapacity(old_caps, old_caps_asym) if old_caps_asym: # Make the (currently asymmetrical) system look symmetrical test_caps = [1024 for _ in range(self.target.number_of_cpus)] else: # Make the (currently symmetrical) system look asymmetrical test_caps = range(self.target.number_of_cpus) # Use a try..finally so that we leave the cpu_capacity files as we found # them, even if the test fails (i.e. we raise an AssertionError). try: self.write_cpu_caps(test_caps) self._test_asym_cpucapacity(old_caps, not old_caps_asym) finally: self.write_cpu_caps(old_caps) self._test_asym_cpucapacity(old_caps, old_caps_asym)