1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# Copyright (c) 2022-2023 Huawei Device Co., Ltd. 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15import json 16import os 17import getopt 18import sys 19import contextlib 20 21# Store the hash table of the services that need to validate 22PRIVILEGE_HASH = {} 23CRITICAL_HASH = {} 24 25 26class CfgValidateError(Exception): 27 """ 28 When the process list verification fails, throw this exception 29 """ 30 def __init__(self, name, reason): 31 super().__init__() 32 self.name = name 33 self.reason = reason 34 35 36class CfgItem: 37 """ 38 CfgItem is the value of HASH, representing the interesetd field of a service read from a cfg file 39 """ 40 41 def __init__(self): 42 self.uid = "" 43 self.gid = [] 44 self.need_verified = False 45 self.enabled_critical = False 46 self.loc = "" 47 self.critical = [] 48 self.related_item = ProcessItem() 49 50 def __init__(self, loc): 51 self.uid = "" 52 self.gid = [] 53 self.need_verified = False 54 self.enabled_critical = False 55 self.loc = loc 56 self.critical = [] 57 self.related_item = ProcessItem() 58 59 @classmethod 60 def _is_need_verified_uid(self, uid): 61 return uid == "root" or uid == "system" 62 63 @classmethod 64 def _is_need_verified_gid(self, gid): 65 # To enable gid-root validate, change it to "return gid == "root"" 66 return False 67 68 @classmethod 69 def _is_need_verified_critical(self, critical): 70 return critical[0] == 1 71 72 def set_uid(self, uid): 73 """ 74 Set uid and check it at the same time. 75 The uid needs to be validated only if _is_need_verified_uid return True 76 """ 77 if CfgItem._is_need_verified_uid(uid): 78 self.uid = uid 79 self.need_verified = True 80 81 def append_gid(self, gid): 82 """ 83 Append gid and check it at the same time. 84 The gid needs to be validated only if _is_need_verified_gid return True 85 """ 86 if CfgItem._is_need_verified_gid(gid) and gid not in self.gid: 87 self.gid.append(gid) 88 self.need_verified = True 89 90 def set_critical(self, critical): 91 if CfgItem._is_need_verified_critical(critical): 92 self.critical = critical 93 self.enabled_critical = True 94 95 def handle_socket(self, socket): 96 """ 97 Validate possible field "socket" in the field "services" 98 """ 99 for i in socket: 100 if ("uid" in i) and CfgItem._is_need_verified_uid(i["uid"]): 101 self.need_verified = True 102 if self.uid != "" and self.uid != i["uid"]: 103 print("Error: uid and uid in socket is not same!") 104 print("Cfg location: {}".format(self.loc)) 105 raise CfgValidateError("Customization Error", "cfgs check not pass") 106 self.uid = i["uid"] 107 if "gid" in i : 108 if isinstance(i["gid"], str) and i["gid"] not in self.gid: 109 self.append_gid(i["gid"]) 110 continue 111 for item in i["gid"]: 112 self.append_gid(item) 113 114 115 def record_related_item(self, related_item): 116 """ 117 When its permissions does not match those in process list, 118 records the permissions given in process list 119 """ 120 self.related_item = related_item 121 122 123class ProcessItem: 124 """ 125 Processitem is the data structure of an item read from the process list 126 """ 127 def __init__(self): 128 self.name = "" 129 self.uid = "" 130 self.gid = [] 131 self.critical = [] 132 133 def __init__(self, process_item=None): 134 """ 135 Use the JSON item in the process list to initialize the class 136 """ 137 if process_item is None: 138 self.name = "" 139 self.uid = "" 140 self.gid = [] 141 self.critical = [] 142 return 143 144 self.name = process_item["name"] 145 146 if "uid" in process_item: 147 self.uid = process_item["uid"] 148 else: 149 self.uid = "" 150 if "gid" in process_item: 151 if isinstance(process_item["gid"], str): 152 self.gid = [] 153 self.gid.append(process_item["gid"]) 154 else: 155 self.gid = process_item["gid"] 156 else: 157 self.gid = [] 158 if "critical" in process_item: 159 self.critical = process_item["critical"] 160 else: 161 self.critical = [] 162 163 def verify(self, cfg_item): 164 """ 165 Returns whether the corresponding CFG (cfg_item) has passed the verification 166 """ 167 if self.uid or self.gid: 168 return self._verify_uid(cfg_item.uid) and self._verify_gid(cfg_item.gid) 169 if self.critical: 170 return self.critical == cfg_item.critical 171 return False 172 173 def _verify_uid(self, uid): 174 return not ((uid == "root" or uid == "system") and (uid != self.uid)) 175 176 def _verify_gid(self, gid): 177 return not ("root" in gid and "root" not in self.gid) 178 179 180def print_privilege_hash(): 181 global PRIVILEGE_HASH 182 for i in PRIVILEGE_HASH.items(): 183 print("Name: {}\nuid: {}\ngiven uid: {}\ngid: ".format(i[0], i[1].uid, i[1].related_item.uid), end="") 184 185 for gid in i[1].gid: 186 print(gid, end=" ") 187 print("") 188 189 print("given gid: ", end=" ") 190 for gid in i[1].related_item.gid: 191 print(gid, end=" ") 192 print("") 193 print("Cfg location: {}".format(i[1].loc)) 194 print("") 195 196 197def print_critical_hash(): 198 global CRITICAL_HASH 199 for i in CRITICAL_HASH.items(): 200 print("Cfg location: {}\n".format(i[1].loc), end="") 201 print("Name: {}\ncritical: {}\n".format(i[0], i[1].critical), end="") 202 print("Whitelist-allowed critical: {}\n".format(i[1].related_item.critical)) 203 print("") 204 205 206def container_validate(process_path, list_name, item_container): 207 with open(process_path) as fp: 208 data = json.load(fp) 209 if list_name not in data: 210 print("Error: {}is not a valid whilelist, it has not a wanted field name".format(process_path)) 211 raise CfgValidateError("Customization Error", "cfgs check not pass") 212 213 for i in data[list_name]: 214 if i["name"] not in item_container : 215 # no CfgItem in HASH meet the item in process list 216 continue 217 218 temp_item = ProcessItem(i) 219 if temp_item.name not in item_container: 220 continue 221 222 if temp_item.verify(item_container.get(temp_item.name)): 223 # Process field check passed, remove the corresponding service from HASH 224 item_container.pop(temp_item.name) 225 else: 226 item_container.get(temp_item.name).record_related_item(temp_item) 227 228 229def check_container(critical_process_path): 230 global PRIVILEGE_HASH 231 global CRITICAL_HASH 232 if PRIVILEGE_HASH: 233 # The remaining services in HASH do not pass the high-privilege validation 234 print("Error: some services are not authenticated. Listed as follow:") 235 print_privilege_hash() 236 237 raise CfgValidateError("Customization Error", "cfgs check not pass") 238 239 if CRITICAL_HASH: 240 # The remaining services in HASH do not pass the critical validation 241 print("Error: some services do not match with critical whitelist({}).".format(critical_process_path), end="") 242 print(" Directly enable critical or modify enabled critical services are prohibited!", end="") 243 print(" Misconfigured services listed as follow:") 244 print_critical_hash() 245 246 raise CfgValidateError("Customization Error", "cfgs check not pass") 247 return 248 249 250 251def validate_cfg_file(privilege_process_path, critical_process_path, result_path): 252 """ 253 Load the process list file 254 For each item in the list, find out whether there is a CfgItem needs validation in HASH 255 """ 256 global PRIVILEGE_HASH 257 global CRITICAL_HASH 258 if not os.path.exists(privilege_process_path): 259 print("High-privilege process check skipped: file [{}] not exist".format(privilege_process_path)) 260 PRIVILEGE_HASH.clear() 261 else: 262 container_validate(privilege_process_path, "high_privilege_process_list", PRIVILEGE_HASH) 263 if not os.path.exists(critical_process_path): 264 print("Critical-reboot process check skipped: file [{}] not exist".format(critical_process_path)) 265 CRITICAL_HASH.clear() 266 else: 267 container_validate(critical_process_path, "critical_reboot_process_list", CRITICAL_HASH) 268 269 check_container(critical_process_path) 270 271 272def handle_services(filename, field): 273 global PRIVILEGE_HASH 274 global CRITICAL_HASH 275 cfg_item = CfgItem(filename) 276 key = field['name'] 277 if "uid" in field: 278 cfg_item.set_uid(field["uid"]) 279 if "gid" in field: 280 if isinstance(field["gid"], str): 281 cfg_item.append_gid(field["gid"]) 282 else: 283 for item in field["gid"]: 284 cfg_item.append_gid(item) 285 if "socket" in field: 286 cfg_item.handle_socket(field["socket"]) 287 if "critical" in field: 288 cfg_item.set_critical(field["critical"]) 289 if cfg_item.need_verified: 290 # Services that need to check permissions are added to HASH 291 PRIVILEGE_HASH[key] = cfg_item 292 if cfg_item.enabled_critical: 293 # Services that need to check critical are added to HASH 294 CRITICAL_HASH[key] = cfg_item 295 296 297def parse_cfg_file(filename): 298 """ 299 Load the cfg file in JSON format 300 """ 301 with open(filename) as fp: 302 try: 303 data = json.load(fp) 304 except json.decoder.JSONDecodeError: 305 print("\nError: loading cfg file {} failed. Cfg file not in JSON format!\n".format(filename)) 306 if "services" not in data: 307 return 308 for field in data['services']: 309 handle_services(filename, field) 310 return 311 312 313def iterate_cfg_folder(cfg_dir): 314 for file in os.listdir(cfg_dir): 315 if file.endswith(".cfg"): 316 parse_cfg_file("{}/{}".format(cfg_dir, file)) 317 return 318 319 320def main(): 321 opts, args = getopt.getopt(sys.argv[1:], '', ['sys-cfg-folder=', 'vendor-cfg-folder=', \ 322 'high-privilege-process-list-path=', 'critical-reboot-process-list-path=', 'result-path=']) 323 324 sys_cfg_folder = opts[0][1] 325 if not os.path.exists(sys_cfg_folder): 326 print("Process field check skipped: file [{}] not exist".format(sys_cfg_folder)) 327 return 328 329 vendor_cfg_folder = opts[1][1] 330 if not os.path.exists(vendor_cfg_folder): 331 print("Process field check skipped: file [{}] not exist".format(vendor_cfg_folder)) 332 return 333 334 privilege_process_path = opts[2][1] 335 336 critical_process_path = opts[3][1] 337 338 iterate_cfg_folder(sys_cfg_folder) 339 iterate_cfg_folder(vendor_cfg_folder) 340 validate_cfg_file(privilege_process_path, critical_process_path, None) 341 342 return 343 344if __name__ == "__main__": 345 346 main() 347 348