1#!/usr/bin/env python3 2# -- coding: utf-8 -- 3# 4# Copyright (c) 2024-2025 Huawei Device Co., Ltd. 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18import os 19import subprocess 20from pathlib import Path 21 22from runner.common_exceptions import InvalidConfiguration, RunnerException 23from runner.logger import Log 24from runner.utils import remove_prefix 25 26_LOGGER = Log.get_logger(__file__) 27 28 29class CPUMask: 30 props = ('online',) 31 cpu_root = Path("/") / "sys" / "devices" / "system" / "cpu" 32 sh = f"hdc {os.environ.get('HDC_OPTIONS', '')} shell" 33 34 def __init__(self, mask: str) -> None: 35 self.done = False 36 self.saved_state: dict[int, dict[str, str]] = {} 37 self.cpumask = list(map(lambda x: x == '1', mask)) 38 if not any(self.cpumask): 39 raise InvalidConfiguration(f'Wrong cpumask: {mask}') 40 41 def apply(self) -> None: 42 _LOGGER.all("---before_suite:") 43 44 def get_prop(cpu: int, prop: str) -> str: 45 field = str(self.cpu_root / f"cpu{cpu}" / prop) 46 result = subprocess.run( 47 f'{self.sh} cat {field}', 48 capture_output=True, text=True, check=True) 49 output = result.stdout.strip() 50 return output 51 52 def set_prop(cpu: int, prop: str, val: str) -> None: 53 field = str(self.cpu_root / f"cpu{cpu}" / prop) 54 cmd = f"{self.sh} \"echo {val} > {field}\"" 55 subprocess.run(cmd, stderr=subprocess.STDOUT, check=True) 56 57 result = subprocess.run( 58 f'{self.sh} ls -1 -d {self.cpu_root / "cpu[0-9]*"!s}', 59 capture_output=True, text=True, check=True) 60 out = filter(lambda i: not i.startswith('ls:'), result.stdout.splitlines()) 61 if not out: 62 raise RunnerException('Get cpus failed!') 63 all_cores = sorted([ 64 int(remove_prefix(os.path.split(c)[1], 'cpu')) 65 for c in out if str(self.cpu_root) in c]) 66 _LOGGER.all(f"all_cores={all_cores}") 67 if len(all_cores) > len(self.cpumask): 68 Log.short( 69 _LOGGER, 70 f'Wrong cpumask length for ' 71 f'{len(all_cores)} cores: {self.cpumask}') 72 self.cpumask += [False] * (len(all_cores) - len(self.cpumask)) 73 _LOGGER.all(f"cpumask={self.cpumask}") 74 # save initial cpu state 75 self.saved_state = {cpu: {prop: get_prop(cpu, prop) for prop in CPUMask.props} for cpu in all_cores} 76 _LOGGER.all(f"saved_state: {self.saved_state}") 77 # update cpu settings 78 on_cores = [index for index, on in enumerate(self.cpumask) if on] 79 off_cores = [index for index, on in enumerate(self.cpumask) if not on] 80 for core in on_cores: 81 set_prop(core, 'online', '1') 82 for core in off_cores: 83 set_prop(core, 'online', '0') 84 self.done = True 85 86 def restore(self) -> None: 87 _LOGGER.all(f"---after_suite {self.saved_state}") 88 89 def set_prop(cpu: int, prop: str, val: str) -> None: 90 field = str(self.cpu_root / f"cpu{cpu}" / prop) 91 cmd = f"{self.sh} \"echo {val} > {field}\"" 92 subprocess.run(cmd, stderr=subprocess.STDOUT, check=True) 93 94 if self.done and self.saved_state: 95 # first back ALL cores online 96 for core, _ in self.saved_state.items(): 97 set_prop(core, 'online', '1') 98 # then restore all props 99 for core, state in self.saved_state.items(): 100 for prop in CPUMask.props: 101 set_prop(core, prop, state[prop]) 102