1import json 2import os 3import time 4import logging 5import pytest 6import sqlite3 7 8 9class Test: 10 @pytest.mark.parametrize('setup_teardown', [None], indirect=True) 11 def test(self, setup_teardown, device): 12 t = time.time() 13 14 check_list_file = os.path.join(device.resource_path, 'acl_whitelist.json') 15 assert os.path.exists(check_list_file), '{} not exist'.format(check_list_file) 16 logging.info('reading {} content'.format(check_list_file)) 17 whitelist_dict = {} 18 json_data = json.load(open(check_list_file, 'r')) 19 for item in json_data: 20 whitelist_dict.update({item.get('processName'): item.get('acls')}) 21 22 logging.info('exporting access_token.db') 23 device.hdc_file_recv('/data/service/el1/public/access_token/access_token.db') 24 device.hdc_file_recv('/data/service/el1/public/access_token/access_token.db-wal') 25 device.hdc_file_recv('/data/service/el1/public/access_token/access_token.db-shm') 26 db_file = os.path.join(device.report_path, 'access_token.db') 27 assert os.path.exists(db_file), '{} not exist'.format(db_file) 28 29 logging.info('exporting permission_definitions.json') 30 DEFINE_PERMISSION_FILE = "/system/etc/access_token/permission_definitions.json" 31 device.hdc_file_recv(DEFINE_PERMISSION_FILE) 32 perm_def_file = os.path.join(device.report_path, 'permission_definitions.json') 33 assert os.path.exists(perm_def_file), '{} not exist'.format(perm_def_file) 34 35 logging.info('exporting nativetoken.json') 36 SA_INFO_FILE = "/data/service/el0/access_token/nativetoken.json" 37 device.hdc_file_recv(SA_INFO_FILE) 38 sa_info_file = os.path.join(device.report_path, 'nativetoken.json') 39 assert os.path.exists(sa_info_file), '{} not exist'.format(sa_info_file) 40 41 logging.info('insert permission_definition_table') 42 self.insert_perm(perm_def_file, db_file) 43 44 logging.info('querying native_token_info_table') 45 46 sa_result = self.query_sa_info(db_file, sa_info_file) 47 assert sa_result, 'native_token_info_table is empty' 48 49 logging.info('querying from native_token_info_table end') 50 51 check_rst = True 52 for process, permission_list in sa_result.items(): 53 if process not in whitelist_dict.keys(): 54 check_rst = False 55 logging.error('processName={} not configured while list permission: {}'.format(process, permission_list)) 56 else: 57 whitelist_set = set(whitelist_dict[process]) 58 permission_set = set(permission_list) 59 not_applied = permission_set.difference(whitelist_set) 60 if not_applied: 61 check_rst = False 62 logging.error('processName={} not configured while list permission: {}'.format(process, not_applied)) 63 64 logging.info("ACL CHECK COST: {}s".format(time.time() - t)) 65 assert check_rst, 'ACL check failed' 66 67 @staticmethod 68 def query_sa_info(db_file, sa_info_file): 69 sql = 'SELECT permission_name, available_level FROM permission_definition_table;' 70 conn = sqlite3.connect(db_file) 71 assert conn, 'sqlit database connect failed' 72 cursor = conn.cursor() 73 cursor.execute(sql) 74 results = cursor.fetchall() 75 conn.close() 76 77 perm_map ={} 78 for item in results: 79 permission_name = item[0] 80 apl = item[1] 81 perm_map[permission_name] = apl 82 83 check_pass = True 84 result_map ={} 85 86 with open(sa_info_file, 'r') as file: 87 data = json.load(file) 88 for item in data: 89 processName = item.get('processName') 90 APL = item.get('APL') 91 if APL == 3: 92 logging.info('{} APL = 3, PASS.'.format(processName)) 93 continue 94 95 permissions = item.get('permissions') 96 97 if not permissions: 98 logging.info('Process {} does not apply for permission, PASS.'.format(processName)) 99 continue 100 101 nativeAcls = item.get('nativeAcls') 102 for perm in permissions: 103 if perm in nativeAcls: 104 continue 105 106 PAPL = perm_map.get(perm) 107 if not PAPL: 108 logging.warning('{} no definition'.format(perm)) 109 continue 110 if PAPL > APL: 111 logging.error('{} invalid is detected in {}'.format(perm, processName)) 112 check_pass = False 113 114 if nativeAcls: 115 result_map[processName] = nativeAcls 116 117 assert check_pass, 'ACL check failed' 118 return result_map 119 120 @staticmethod 121 def insert_perm(perm_def_file, db_file): 122 query_perm_sql = "select permission_name from permission_definition_table;" 123 conn = sqlite3.connect(db_file) 124 assert conn, 'sqlit database connect failed' 125 cursor = conn.cursor() 126 cursor.execute(query_perm_sql) 127 perms_list = cursor.fetchall() 128 conn.close() 129 perms_set = set() 130 for perms in perms_list: 131 perms_set.add(perms[0]) 132 133 sql = 'insert into permission_definition_table(token_id, permission_name, bundle_name, grant_mode, available_level, provision_enable, distributed_scene_enable, label, label_id, description, description_id, available_type) values(560, ?, "xxxx", 1, ?, 1, 1, "xxxx", 1, "xxxx", 1, 1)' 134 sql_data = [] 135 with open(perm_def_file, 'r') as file: 136 data = json.load(file) 137 system_grant_list = data.get('systemGrantPermissions') 138 user_grant_list = data.get('userGrantPermissions') 139 for item in user_grant_list: 140 key = item.get('name') 141 if key not in perms_set: 142 value_str = item.get('availableLevel') 143 value = 1 144 if value_str == "system_core": 145 value = 3 146 elif value_str == "system_basic": 147 value = 2 148 sql_data.append([key, value]) 149 for item in system_grant_list: 150 key = item.get('name') 151 if key not in perms_set: 152 value_str = item.get('availableLevel') 153 value = 1 154 if value_str == "system_core": 155 value = 3 156 elif value_str == "system_basic": 157 value = 2 158 sql_data.append([key, value]) 159 160 logging.warning('insert permission_definition_table size: {}'.format(len(sql_data))) 161 conn = sqlite3.connect(db_file) 162 assert conn, 'sqlit database connect failed' 163 cursor = conn.cursor() 164 cursor.executemany(sql, sql_data) 165 results = cursor.fetchall() 166 conn.commit() 167 conn.close() 168