1#!/usr/bin/env python 2# -*- coding: utf-8 -*- 3# Copyright (c) 2025 Huawei Device Co., Ltd. 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16import os 17import multiprocessing 18 19 20def detect_cgroup_version(): 21 with os.popen("stat -fc %T /sys/fs/cgroup") as f: 22 fs_type = f.read().strip() 23 return "v2" if fs_type == "cgroup2fs" else "v1" 24 25 26def get_cpuinfo_core_count(): 27 try: 28 return multiprocessing.cpu_count() 29 except IOError: 30 return 8 31 32 33def get_cpu_count_v2(): 34 with open("/proc/self/cgroup") as f: 35 for line in f: 36 if not line.startswith("0::"): 37 continue 38 path = line.strip().split("::")[1] 39 cpu_max_path = "/sys/fs/cgroup{}/cpu.max".format(path) 40 if not os.path.exists(cpu_max_path): 41 continue 42 return read_cgroup_info(cpu_max_path) 43 return None 44 45 46def read_cgroup_info(cpu_max_path): 47 with open(cpu_max_path) as cf: 48 quota, period = cf.read().strip().split() 49 if quota == "max": 50 return get_cpuinfo_core_count() 51 return int(int(quota) / int(period)) 52 53 54def get_cpu_count_v1(): 55 quota_us_file = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us" 56 period_us_file = "/sys/fs/cgroup/cpu/cpu.cfs_period_us" 57 58 if os.path.exists(quota_us_file) and os.path.exists(period_us_file): 59 cfs_quota_us = -1 60 cfs_period_us = 0 61 try: 62 with open(quota_us_file, "r") as quota, open(period_us_file, "r") as period: 63 cfs_quota_us = int(quota.read().strip()) 64 cfs_period_us = int(period.read().strip()) 65 except IOError: 66 return None 67 68 if cfs_quota_us != -1 and cfs_period_us != 0: 69 return int(cfs_quota_us / cfs_period_us) 70 return None 71 72 73def get_cpu_count(): 74 version = detect_cgroup_version() 75 if version == "v2": 76 result = get_cpu_count_v2() 77 elif version == "v1": 78 result = get_cpu_count_v1() 79 else: 80 result = get_cpuinfo_core_count() 81 if not result: 82 result = get_cpuinfo_core_count() 83 return result 84 85 86def main(): 87 print(get_cpu_count()) 88 89 90if __name__ == "__main__": 91 main() 92