• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3# Copyright (c) 2022 Huawei Device Co., Ltd.
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15import json
16import os
17import getopt
18import sys
19import contextlib
20
21# Store the hash table of the services that need to validate
22CFG_HASH = {}
23
24
25class CfgValidateError(Exception):
26    """
27    When the process list verification fails, throw this exception
28    """
29    def __init__(self, name, reason):
30        super().__init__()
31        self.name = name
32        self.reason = reason
33
34
35class CfgItem:
36    """
37    CfgItem is the value of CFG_HASH, representing the permissions of a service read from a cfg file
38    """
39
40    def __init__(self):
41        self.uid = ""
42        self.gid = []
43        self.need_verified = False
44        self.loc = ""
45        self.related_item = ProcessItem()
46
47    def __init__(self, loc):
48        self.uid = ""
49        self.gid = []
50        self.need_verified = False
51        self.loc = loc
52        self.related_item = ProcessItem()
53
54    @classmethod
55    def _is_need_verified_uid(self, uid):
56        return uid == "root" or uid == "system"
57
58    @classmethod
59    def _is_need_verified_gid(self, gid):
60        # To enable gid-root validate, change it to "return gid == "root""
61        return False
62
63    def set_uid(self, uid):
64        """
65        Set uid and check it at the same time.
66        The uid needs to be validated only if _is_need_verified_uid return True
67        """
68        if CfgItem._is_need_verified_uid(uid):
69            self.uid = uid
70            self.need_verified = True
71
72    def append_gid(self, gid):
73        """
74        Append gid and check it at the same time.
75        The gid needs to be validated only if _is_need_verified_gid return True
76        """
77        if CfgItem._is_need_verified_gid(gid) and gid not in self.gid:
78            self.gid.append(gid)
79            self.need_verified = True
80
81    def handle_socket(self, socket):
82        """
83        Validate possible field "socket" in the field "services"
84        """
85        for i in socket:
86            if ("uid" in i) and CfgItem._is_need_verified_uid(i["uid"]):
87                self.need_verified = True
88                if self.uid != "" and self.uid != i["uid"]:
89                    print("Error: uid and uid in socket is not same!")
90                    print("Cfg location: {}".format(self.loc))
91                    raise CfgValidateError("Customization Error", "cfgs check not pass")
92                self.uid = i["uid"]
93            if "gid" in i :
94                if isinstance(i["gid"], str) and i["gid"] not in self.gid:
95                    self.append_gid(i["gid"])
96                    continue
97                for item in i["gid"]:
98                    self.append_gid(item)
99
100
101    def record_related_item(self, related_item):
102        """
103        When its permissions does not match those in process list,
104        records the permissions given in process list
105        """
106        self.related_item = related_item
107
108
109class ProcessItem:
110    """
111    Processitem is the data structure of an item read from the process list
112    """
113    def __init__(self):
114        self.name = ""
115        self.uid = ""
116        self.gid = []
117
118    def __init__(self, process_item=None):
119        """
120        Use the JSON item in the process list to initialize the class
121        """
122        if process_item is None:
123            self.name = ""
124            self.uid = ""
125            self.gid = []
126            return
127
128        self.name = process_item["name"]
129
130        if "uid" in process_item:
131            self.uid = process_item["uid"]
132        else:
133            self.uid = ""
134        if "gid" in process_item:
135            if isinstance(process_item["gid"], str):
136                self.gid = []
137                self.gid.append(process_item["gid"])
138            else:
139                self.gid = process_item["gid"]
140        else:
141            self.gid = []
142
143    def verify(self, cfg_item):
144        """
145        Returns whether the corresponding CFG (cfg_item) has passed the verification
146        """
147        return self._verify_uid(cfg_item.uid) and self._verify_gid(cfg_item.gid)
148
149    def _verify_uid(self, uid):
150        return not ((uid == "root" or uid == "system") and (uid != self.uid))
151
152    def _verify_gid(self, gid):
153        return not ("root" in gid and "root" not in self.gid)
154
155
156def print_cfg_hash():
157    global CFG_HASH
158    for i in CFG_HASH.items():
159        print("Name: {}\nuid: {}\ngiven uid: {}\ngid: ".format(i[0], i[1].uid, i[1].related_item.uid), end="")
160
161        for gid in i[1].gid:
162            print(gid, end=" ")
163        print("")
164
165        print("given gid: ", end=" ")
166        for gid in i[1].related_item.gid:
167            print(gid, end=" ")
168        print("")
169        print("Cfg location: {}".format(i[1].loc))
170        print("")
171
172
173def validate_cfg_file(process_path, result_path):
174    """
175    Load the process list file
176    For each item in the list, find out whether there is a CfgItem needs validation in CFG_HASH
177    """
178    global CFG_HASH
179    with open(process_path) as fp:
180        data = json.load(fp)
181        if "high_privilege_process_list" not in data:
182            print("Error: {}is not a valid whilelist, it has not a wanted field name".format(process_path))
183            raise CfgValidateError("Customization Error", "cfgs check not pass")
184
185        for i in data["high_privilege_process_list"]:
186            if i["name"] not in CFG_HASH :
187                # no CfgItem in CFG_HASH meet the item in process list
188                continue
189
190            temp_item = ProcessItem(i)
191            if temp_item.name not in CFG_HASH:
192                continue
193
194            if temp_item.verify(CFG_HASH.get(temp_item.name)):
195                # Permission check is passed, remove the corresponding service from CFG_HASH
196                CFG_HASH.pop(temp_item.name)
197            else:
198                CFG_HASH.get(temp_item.name).record_related_item(temp_item)
199
200    if CFG_HASH:
201        # The remaining services in CFG_HASH do not pass the validation
202        for i in CFG_HASH.items():
203            print("Error: some services are not authenticated. Listed as follow:")
204            print_cfg_hash()
205
206            raise CfgValidateError("Customization Error", "cfgs check not pass")
207    return
208
209
210def handle_services(filename, field):
211    global CFG_HASH
212    cfg_item = CfgItem(filename)
213    key = field['name']
214    if "uid" in field:
215        cfg_item.set_uid(field["uid"])
216    if "gid" in field:
217        if isinstance(field["gid"], str):
218            cfg_item.append_gid(field["gid"])
219        else:
220            for item in field["gid"]:
221                cfg_item.append_gid(item)
222    if "socket" in field:
223        cfg_item.handle_socket(field["socket"])
224    if cfg_item.need_verified:
225        # Services that need to check permissions are added to CFG_HASH
226        CFG_HASH[key] = cfg_item
227
228
229def parse_cfg_file(filename):
230    """
231    Load the cfg file in JSON format
232    """
233    with open(filename) as fp:
234        data = json.load(fp)
235        if "services" not in data:
236            return
237        for field in data['services']:
238            handle_services(filename, field)
239    return
240
241
242def iterate_cfg_folder(cfg_dir):
243    for file in os.listdir(cfg_dir):
244        if file.endswith(".cfg"):
245            parse_cfg_file("{}/{}".format(cfg_dir, file))
246    return
247
248
249def main():
250    opts, args = getopt.getopt(sys.argv[1:], '', ['sys-cfg-folder=', 'vendor-cfg-folder=', \
251        'high-privilege-process-list-path=', 'result-path='])
252
253    sys_cfg_folder = opts[0][1]
254    if not os.path.exists(sys_cfg_folder):
255        print("High-privilege process check skipped: file [{}] not exist".format(sys_cfg_folder))
256        return
257
258    vendor_cfg_folder = opts[1][1]
259    if not os.path.exists(vendor_cfg_folder):
260        print("High-privilege process check skipped: file [{}] not exist".format(vendor_cfg_folder))
261        return
262
263    process_path = opts[2][1]
264    if not os.path.exists(process_path):
265        print("High-privilege process check skipped: file [{}] not exist".format(process_path))
266        return
267
268    iterate_cfg_folder(sys_cfg_folder)
269    iterate_cfg_folder(vendor_cfg_folder)
270    validate_cfg_file(process_path, None)
271
272    return
273
274if __name__ == "__main__":
275
276    main()
277
278