• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import json
2import os
3import time
4import logging
5import pytest
6import sqlite3
7
8
9class Test:
10    @pytest.mark.parametrize('setup_teardown', [None], indirect=True)
11    def test(self, setup_teardown, device):
12        t = time.time()
13
14        check_list_file = os.path.join(device.resource_path, 'acl_whitelist.json')
15        assert os.path.exists(check_list_file), '{} not exist'.format(check_list_file)
16        logging.info('reading {} content'.format(check_list_file))
17        whitelist_dict = {}
18        json_data = json.load(open(check_list_file, 'r'))
19        for item in json_data:
20            whitelist_dict.update({item.get('processName'): item.get('acls')})
21
22        logging.info('exporting access_token.db')
23        device.hdc_file_recv('/data/service/el1/public/access_token/access_token.db')
24        device.hdc_file_recv('/data/service/el1/public/access_token/access_token.db-wal')
25        device.hdc_file_recv('/data/service/el1/public/access_token/access_token.db-shm')
26        db_file = os.path.join(device.report_path, 'access_token.db')
27        assert os.path.exists(db_file), '{} not exist'.format(db_file)
28
29        logging.info('exporting permission_definitions.json')
30        DEFINE_PERMISSION_FILE = "/system/etc/access_token/permission_definitions.json"
31        device.hdc_file_recv(DEFINE_PERMISSION_FILE)
32        perm_def_file = os.path.join(device.report_path, 'permission_definitions.json')
33        assert os.path.exists(perm_def_file), '{} not exist'.format(perm_def_file)
34
35        logging.info('exporting nativetoken.json')
36        SA_INFO_FILE = "/data/service/el0/access_token/nativetoken.json"
37        device.hdc_file_recv(SA_INFO_FILE)
38        sa_info_file = os.path.join(device.report_path, 'nativetoken.json')
39        assert os.path.exists(sa_info_file), '{} not exist'.format(sa_info_file)
40
41        logging.info('insert permission_definition_table')
42        self.insert_perm(perm_def_file, db_file)
43
44        logging.info('querying native_token_info_table')
45
46        sa_result = self.query_sa_info(db_file, sa_info_file)
47        assert sa_result, 'native_token_info_table is empty'
48
49        logging.info('querying from native_token_info_table end')
50
51        check_rst = True
52        for process, permission_list in sa_result.items():
53            if process not in whitelist_dict.keys():
54                check_rst = False
55                logging.error('processName={} not configured while list permission: {}'.format(process, permission_list))
56            else:
57                whitelist_set = set(whitelist_dict[process])
58                permission_set = set(permission_list)
59                not_applied = permission_set.difference(whitelist_set)
60                if not_applied:
61                    check_rst = False
62                    logging.error('processName={} not configured while list permission: {}'.format(process, not_applied))
63
64        logging.info("ACL CHECK COST: {}s".format(time.time() - t))
65        assert check_rst, 'ACL check failed'
66
67    @staticmethod
68    def query_sa_info(db_file, sa_info_file):
69        sql = 'SELECT permission_name, available_level FROM permission_definition_table;'
70        conn = sqlite3.connect(db_file)
71        assert conn, 'sqlit database connect failed'
72        cursor = conn.cursor()
73        cursor.execute(sql)
74        results = cursor.fetchall()
75        conn.close()
76
77        perm_map ={}
78        for item in results:
79            permission_name = item[0]
80            apl = item[1]
81            perm_map[permission_name] = apl
82
83        check_pass = True
84        result_map ={}
85
86        with open(sa_info_file, 'r') as file:
87            data = json.load(file)
88            for item in data:
89                processName = item.get('processName')
90                APL = item.get('APL')
91                if APL == 3:
92                    logging.info('{} APL = 3, PASS.'.format(processName))
93                    continue
94
95                permissions = item.get('permissions')
96
97                if not permissions:
98                    logging.info('Process {} does not apply for permission, PASS.'.format(processName))
99                    continue
100
101                nativeAcls = item.get('nativeAcls')
102                for perm in permissions:
103                    if perm in nativeAcls:
104                        continue
105
106                    PAPL = perm_map.get(perm)
107                    if not PAPL:
108                        logging.warning('{} no definition'.format(perm))
109                        continue
110                    if PAPL > APL:
111                        logging.error('{} invalid is detected in {}'.format(perm, processName))
112                        check_pass = False
113
114                if nativeAcls:
115                    result_map[processName] = nativeAcls
116
117        assert check_pass, 'ACL check failed'
118        return result_map
119
120    @staticmethod
121    def insert_perm(perm_def_file, db_file):
122        query_perm_sql = "select permission_name from permission_definition_table;"
123        conn = sqlite3.connect(db_file)
124        assert conn, 'sqlit database connect failed'
125        cursor = conn.cursor()
126        cursor.execute(query_perm_sql)
127        perms_list = cursor.fetchall()
128        conn.close()
129        perms_set = set()
130        for perms in perms_list:
131            perms_set.add(perms[0])
132
133        sql = 'insert into permission_definition_table(token_id, permission_name, bundle_name, grant_mode, available_level, provision_enable, distributed_scene_enable, label, label_id, description, description_id, available_type) values(560, ?, "xxxx", 1, ?, 1, 1, "xxxx", 1, "xxxx", 1, 1)'
134        sql_data = []
135        with open(perm_def_file, 'r') as file:
136            data = json.load(file)
137            perm_def_list = data.get('definePermissions')
138            for item in perm_def_list:
139                key = item.get('name')
140                if key not in perms_set:
141                    value_str = item.get('availableLevel')
142                    value = 1
143                    if value_str == "system_core":
144                        value = 3
145                    elif value_str == "system_basic":
146                        value = 2
147                    sql_data.append([key, value])
148
149        logging.warning('insert permission_definition_table size: {}'.format(len(sql_data)))
150        conn = sqlite3.connect(db_file)
151        assert conn, 'sqlit database connect failed'
152        cursor = conn.cursor()
153        cursor.executemany(sql, sql_data)
154        results = cursor.fetchall()
155        conn.commit()
156        conn.close()
157