• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3# Copyright (c) 2022-2023 Huawei Device Co., Ltd.
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15import json
16import os
17import getopt
18import sys
19import contextlib
20
21# Store the hash table of the services that need to validate
22PRIVILEGE_HASH = {}
23CRITICAL_HASH = {}
24
25
26class CfgValidateError(Exception):
27    """
28    When the process list verification fails, throw this exception
29    """
30    def __init__(self, name, reason):
31        super().__init__()
32        self.name = name
33        self.reason = reason
34
35
36class CfgItem:
37    """
38    CfgItem is the value of HASH, representing the interesetd field of a service read from a cfg file
39    """
40
41    def __init__(self):
42        self.uid = ""
43        self.gid = []
44        self.need_verified = False
45        self.enabled_critical = False
46        self.loc = ""
47        self.critical = []
48        self.related_item = ProcessItem()
49
50    def __init__(self, loc):
51        self.uid = ""
52        self.gid = []
53        self.need_verified = False
54        self.enabled_critical = False
55        self.loc = loc
56        self.critical = []
57        self.related_item = ProcessItem()
58
59    @classmethod
60    def _is_need_verified_uid(self, uid):
61        return uid == "root" or uid == "system"
62
63    @classmethod
64    def _is_need_verified_gid(self, gid):
65        # To enable gid-root validate, change it to "return gid == "root""
66        return False
67
68    @classmethod
69    def _is_need_verified_critical(self, critical):
70        return critical[0] == 1
71
72    def set_uid(self, uid):
73        """
74        Set uid and check it at the same time.
75        The uid needs to be validated only if _is_need_verified_uid return True
76        """
77        if CfgItem._is_need_verified_uid(uid):
78            self.uid = uid
79            self.need_verified = True
80
81    def append_gid(self, gid):
82        """
83        Append gid and check it at the same time.
84        The gid needs to be validated only if _is_need_verified_gid return True
85        """
86        if CfgItem._is_need_verified_gid(gid) and gid not in self.gid:
87            self.gid.append(gid)
88            self.need_verified = True
89
90    def set_critical(self, critical):
91        if CfgItem._is_need_verified_critical(critical):
92            self.critical = critical
93            self.enabled_critical = True
94
95    def handle_socket(self, socket):
96        """
97        Validate possible field "socket" in the field "services"
98        """
99        for i in socket:
100            if ("uid" in i) and CfgItem._is_need_verified_uid(i["uid"]):
101                self.need_verified = True
102                if self.uid != "" and self.uid != i["uid"]:
103                    print("Error: uid and uid in socket is not same!")
104                    print("Cfg location: {}".format(self.loc))
105                    raise CfgValidateError("Customization Error", "cfgs check not pass")
106                self.uid = i["uid"]
107            if "gid" in i :
108                if isinstance(i["gid"], str) and i["gid"] not in self.gid:
109                    self.append_gid(i["gid"])
110                    continue
111                for item in i["gid"]:
112                    self.append_gid(item)
113
114
115    def record_related_item(self, related_item):
116        """
117        When its permissions does not match those in process list,
118        records the permissions given in process list
119        """
120        self.related_item = related_item
121
122
123class ProcessItem:
124    """
125    Processitem is the data structure of an item read from the process list
126    """
127    def __init__(self):
128        self.name = ""
129        self.uid = ""
130        self.gid = []
131        self.critical = []
132
133    def __init__(self, process_item=None):
134        """
135        Use the JSON item in the process list to initialize the class
136        """
137        if process_item is None:
138            self.name = ""
139            self.uid = ""
140            self.gid = []
141            self.critical = []
142            return
143
144        self.name = process_item["name"]
145
146        if "uid" in process_item:
147            self.uid = process_item["uid"]
148        else:
149            self.uid = ""
150        if "gid" in process_item:
151            if isinstance(process_item["gid"], str):
152                self.gid = []
153                self.gid.append(process_item["gid"])
154            else:
155                self.gid = process_item["gid"]
156        else:
157            self.gid = []
158        if "critical" in process_item:
159            self.critical = process_item["critical"]
160        else:
161            self.critical = []
162
163    def verify(self, cfg_item):
164        """
165        Returns whether the corresponding CFG (cfg_item) has passed the verification
166        """
167        if self.uid or self.gid:
168            return self._verify_uid(cfg_item.uid) and self._verify_gid(cfg_item.gid)
169        if self.critical:
170            return self.critical == cfg_item.critical
171        return False
172
173    def _verify_uid(self, uid):
174        return not ((uid == "root" or uid == "system") and (uid != self.uid))
175
176    def _verify_gid(self, gid):
177        return not ("root" in gid and "root" not in self.gid)
178
179
180def print_privilege_hash():
181    global PRIVILEGE_HASH
182    for i in PRIVILEGE_HASH.items():
183        print("Name: {}\nuid: {}\ngiven uid: {}\ngid: ".format(i[0], i[1].uid, i[1].related_item.uid), end="")
184
185        for gid in i[1].gid:
186            print(gid, end=" ")
187        print("")
188
189        print("given gid: ", end=" ")
190        for gid in i[1].related_item.gid:
191            print(gid, end=" ")
192        print("")
193        print("Cfg location: {}".format(i[1].loc))
194        print("")
195
196
197def print_critical_hash():
198    global CRITICAL_HASH
199    for i in CRITICAL_HASH.items():
200        print("Cfg location: {}\n".format(i[1].loc), end="")
201        print("Name: {}\ncritical: {}\n".format(i[0], i[1].critical), end="")
202        print("Whitelist-allowed critical: {}\n".format(i[1].related_item.critical))
203        print("")
204
205
206def container_validate(process_path, list_name, item_container):
207    with open(process_path) as fp:
208        data = json.load(fp)
209        if list_name not in data:
210            print("Error: {}is not a valid whilelist, it has not a wanted field name".format(process_path))
211            raise CfgValidateError("Customization Error", "cfgs check not pass")
212
213        for i in data[list_name]:
214            if i["name"] not in item_container :
215                # no CfgItem in HASH meet the item in process list
216                continue
217
218            temp_item = ProcessItem(i)
219            if temp_item.name not in item_container:
220                continue
221
222            if temp_item.verify(item_container.get(temp_item.name)):
223                # Process field check passed, remove the corresponding service from HASH
224                item_container.pop(temp_item.name)
225            else:
226                item_container.get(temp_item.name).record_related_item(temp_item)
227
228
229def check_container(critical_process_path):
230    global PRIVILEGE_HASH
231    global CRITICAL_HASH
232    if PRIVILEGE_HASH:
233        # The remaining services in HASH do not pass the high-privilege validation
234        print("Error: some services are not authenticated. Listed as follow:")
235        print_privilege_hash()
236
237        raise CfgValidateError("Customization Error", "cfgs check not pass")
238
239    if CRITICAL_HASH:
240        # The remaining services in HASH do not pass the critical validation
241        print("Error: some services do not match with critical whitelist({}).".format(critical_process_path), end="")
242        print(" Directly enable critical or modify enabled critical services are prohibited!", end="")
243        print(" Misconfigured services listed as follow:")
244        print_critical_hash()
245
246        raise CfgValidateError("Customization Error", "cfgs check not pass")
247    return
248
249
250
251def validate_cfg_file(privilege_process_path, critical_process_path, result_path):
252    """
253    Load the process list file
254    For each item in the list, find out whether there is a CfgItem needs validation in HASH
255    """
256    global PRIVILEGE_HASH
257    global CRITICAL_HASH
258    if not os.path.exists(privilege_process_path):
259        print("High-privilege process check skipped: file [{}] not exist".format(privilege_process_path))
260        PRIVILEGE_HASH.clear()
261    else:
262        container_validate(privilege_process_path, "high_privilege_process_list", PRIVILEGE_HASH)
263    if not os.path.exists(critical_process_path):
264        print("Critical-reboot process check skipped: file [{}] not exist".format(critical_process_path))
265        CRITICAL_HASH.clear()
266    else:
267        container_validate(critical_process_path, "critical_reboot_process_list", CRITICAL_HASH)
268
269    check_container(critical_process_path)
270
271
272def handle_services(filename, field):
273    global PRIVILEGE_HASH
274    global CRITICAL_HASH
275    cfg_item = CfgItem(filename)
276    key = field['name']
277    if "uid" in field:
278        cfg_item.set_uid(field["uid"])
279    if "gid" in field:
280        if isinstance(field["gid"], str):
281            cfg_item.append_gid(field["gid"])
282        else:
283            for item in field["gid"]:
284                cfg_item.append_gid(item)
285    if "socket" in field:
286        cfg_item.handle_socket(field["socket"])
287    if "critical" in field:
288        cfg_item.set_critical(field["critical"])
289    if cfg_item.need_verified:
290        # Services that need to check permissions are added to HASH
291        PRIVILEGE_HASH[key] = cfg_item
292    if cfg_item.enabled_critical:
293        # Services that need to check critical are added to HASH
294        CRITICAL_HASH[key] = cfg_item
295
296
297def parse_cfg_file(filename):
298    """
299    Load the cfg file in JSON format
300    """
301    with open(filename) as fp:
302        try:
303            data = json.load(fp)
304        except json.decoder.JSONDecodeError:
305            print("\nError: loading cfg file {} failed. Cfg file not in JSON format!\n".format(filename))
306        if "services" not in data:
307            return
308        for field in data['services']:
309            handle_services(filename, field)
310    return
311
312
313def iterate_cfg_folder(cfg_dir):
314    for file in os.listdir(cfg_dir):
315        if file.endswith(".cfg"):
316            parse_cfg_file("{}/{}".format(cfg_dir, file))
317    return
318
319
320def main():
321    opts, args = getopt.getopt(sys.argv[1:], '', ['sys-cfg-folder=', 'vendor-cfg-folder=', \
322        'high-privilege-process-list-path=', 'critical-reboot-process-list-path=', 'result-path='])
323
324    sys_cfg_folder = opts[0][1]
325    if not os.path.exists(sys_cfg_folder):
326        print("Process field check skipped: file [{}] not exist".format(sys_cfg_folder))
327        return
328
329    vendor_cfg_folder = opts[1][1]
330    if not os.path.exists(vendor_cfg_folder):
331        print("Process field check skipped: file [{}] not exist".format(vendor_cfg_folder))
332        return
333
334    privilege_process_path = opts[2][1]
335
336    critical_process_path = opts[3][1]
337
338    iterate_cfg_folder(sys_cfg_folder)
339    iterate_cfg_folder(vendor_cfg_folder)
340    validate_cfg_file(privilege_process_path, critical_process_path, None)
341
342    return
343
344if __name__ == "__main__":
345
346    main()
347
348