1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# Copyright (c) 2022 Huawei Device Co., Ltd. 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15import json 16import os 17import getopt 18import sys 19import contextlib 20 21# Store the hash table of the services that need to validate 22CFG_HASH = {} 23 24 25class CfgValidateError(Exception): 26 """ 27 When the process list verification fails, throw this exception 28 """ 29 def __init__(self, name, reason): 30 super().__init__() 31 self.name = name 32 self.reason = reason 33 34 35class CfgItem: 36 """ 37 CfgItem is the value of CFG_HASH, representing the permissions of a service read from a cfg file 38 """ 39 40 def __init__(self): 41 self.uid = "" 42 self.gid = [] 43 self.need_verified = False 44 self.loc = "" 45 self.related_item = ProcessItem() 46 47 def __init__(self, loc): 48 self.uid = "" 49 self.gid = [] 50 self.need_verified = False 51 self.loc = loc 52 self.related_item = ProcessItem() 53 54 @classmethod 55 def _is_need_verified_uid(self, uid): 56 return uid == "root" or uid == "system" 57 58 @classmethod 59 def _is_need_verified_gid(self, gid): 60 # To enable gid-root validate, change it to "return gid == "root"" 61 return False 62 63 def set_uid(self, uid): 64 """ 65 Set uid and check it at the same time. 66 The uid needs to be validated only if _is_need_verified_uid return True 67 """ 68 if CfgItem._is_need_verified_uid(uid): 69 self.uid = uid 70 self.need_verified = True 71 72 def append_gid(self, gid): 73 """ 74 Append gid and check it at the same time. 75 The gid needs to be validated only if _is_need_verified_gid return True 76 """ 77 if CfgItem._is_need_verified_gid(gid) and gid not in self.gid: 78 self.gid.append(gid) 79 self.need_verified = True 80 81 def handle_socket(self, socket): 82 """ 83 Validate possible field "socket" in the field "services" 84 """ 85 for i in socket: 86 if ("uid" in i) and CfgItem._is_need_verified_uid(i["uid"]): 87 self.need_verified = True 88 if self.uid != "" and self.uid != i["uid"]: 89 print("Error: uid and uid in socket is not same!") 90 print("Cfg location: {}".format(self.loc)) 91 raise CfgValidateError("Customization Error", "cfgs check not pass") 92 self.uid = i["uid"] 93 if "gid" in i : 94 if isinstance(i["gid"], str) and i["gid"] not in self.gid: 95 self.append_gid(i["gid"]) 96 continue 97 for item in i["gid"]: 98 self.append_gid(item) 99 100 101 def record_related_item(self, related_item): 102 """ 103 When its permissions does not match those in process list, 104 records the permissions given in process list 105 """ 106 self.related_item = related_item 107 108 109class ProcessItem: 110 """ 111 Processitem is the data structure of an item read from the process list 112 """ 113 def __init__(self): 114 self.name = "" 115 self.uid = "" 116 self.gid = [] 117 118 def __init__(self, process_item=None): 119 """ 120 Use the JSON item in the process list to initialize the class 121 """ 122 if process_item is None: 123 self.name = "" 124 self.uid = "" 125 self.gid = [] 126 return 127 128 self.name = process_item["name"] 129 130 if "uid" in process_item: 131 self.uid = process_item["uid"] 132 else: 133 self.uid = "" 134 if "gid" in process_item: 135 if isinstance(process_item["gid"], str): 136 self.gid = [] 137 self.gid.append(process_item["gid"]) 138 else: 139 self.gid = process_item["gid"] 140 else: 141 self.gid = [] 142 143 def verify(self, cfg_item): 144 """ 145 Returns whether the corresponding CFG (cfg_item) has passed the verification 146 """ 147 return self._verify_uid(cfg_item.uid) and self._verify_gid(cfg_item.gid) 148 149 def _verify_uid(self, uid): 150 return not ((uid == "root" or uid == "system") and (uid != self.uid)) 151 152 def _verify_gid(self, gid): 153 return not ("root" in gid and "root" not in self.gid) 154 155 156def print_cfg_hash(): 157 global CFG_HASH 158 for i in CFG_HASH.items(): 159 print("Name: {}\nuid: {}\ngiven uid: {}\ngid: ".format(i[0], i[1].uid, i[1].related_item.uid), end="") 160 161 for gid in i[1].gid: 162 print(gid, end=" ") 163 print("") 164 165 print("given gid: ", end=" ") 166 for gid in i[1].related_item.gid: 167 print(gid, end=" ") 168 print("") 169 print("Cfg location: {}".format(i[1].loc)) 170 print("") 171 172 173def validate_cfg_file(process_path, result_path): 174 """ 175 Load the process list file 176 For each item in the list, find out whether there is a CfgItem needs validation in CFG_HASH 177 """ 178 global CFG_HASH 179 with open(process_path) as fp: 180 data = json.load(fp) 181 if "high_privilege_process_list" not in data: 182 print("Error: {}is not a valid whilelist, it has not a wanted field name".format(process_path)) 183 raise CfgValidateError("Customization Error", "cfgs check not pass") 184 185 for i in data["high_privilege_process_list"]: 186 if i["name"] not in CFG_HASH : 187 # no CfgItem in CFG_HASH meet the item in process list 188 continue 189 190 temp_item = ProcessItem(i) 191 if temp_item.name not in CFG_HASH: 192 continue 193 194 if temp_item.verify(CFG_HASH.get(temp_item.name)): 195 # Permission check is passed, remove the corresponding service from CFG_HASH 196 CFG_HASH.pop(temp_item.name) 197 else: 198 CFG_HASH.get(temp_item.name).record_related_item(temp_item) 199 200 if CFG_HASH: 201 # The remaining services in CFG_HASH do not pass the validation 202 for i in CFG_HASH.items(): 203 print("Error: some services are not authenticated. Listed as follow:") 204 print_cfg_hash() 205 206 raise CfgValidateError("Customization Error", "cfgs check not pass") 207 return 208 209 210def handle_services(filename, field): 211 global CFG_HASH 212 cfg_item = CfgItem(filename) 213 key = field['name'] 214 if "uid" in field: 215 cfg_item.set_uid(field["uid"]) 216 if "gid" in field: 217 if isinstance(field["gid"], str): 218 cfg_item.append_gid(field["gid"]) 219 else: 220 for item in field["gid"]: 221 cfg_item.append_gid(item) 222 if "socket" in field: 223 cfg_item.handle_socket(field["socket"]) 224 if cfg_item.need_verified: 225 # Services that need to check permissions are added to CFG_HASH 226 CFG_HASH[key] = cfg_item 227 228 229def parse_cfg_file(filename): 230 """ 231 Load the cfg file in JSON format 232 """ 233 with open(filename) as fp: 234 data = json.load(fp) 235 if "services" not in data: 236 return 237 for field in data['services']: 238 handle_services(filename, field) 239 return 240 241 242def iterate_cfg_folder(cfg_dir): 243 for file in os.listdir(cfg_dir): 244 if file.endswith(".cfg"): 245 parse_cfg_file("{}/{}".format(cfg_dir, file)) 246 return 247 248 249def main(): 250 opts, args = getopt.getopt(sys.argv[1:], '', ['sys-cfg-folder=', 'vendor-cfg-folder=', \ 251 'high-privilege-process-list-path=', 'result-path=']) 252 253 sys_cfg_folder = opts[0][1] 254 if not os.path.exists(sys_cfg_folder): 255 print("High-privilege process check skipped: file [{}] not exist".format(sys_cfg_folder)) 256 return 257 258 vendor_cfg_folder = opts[1][1] 259 if not os.path.exists(vendor_cfg_folder): 260 print("High-privilege process check skipped: file [{}] not exist".format(vendor_cfg_folder)) 261 return 262 263 process_path = opts[2][1] 264 if not os.path.exists(process_path): 265 print("High-privilege process check skipped: file [{}] not exist".format(process_path)) 266 return 267 268 iterate_cfg_folder(sys_cfg_folder) 269 iterate_cfg_folder(vendor_cfg_folder) 270 validate_cfg_file(process_path, None) 271 272 return 273 274if __name__ == "__main__": 275 276 main() 277 278