1import json 2import math 3import os.path 4import sqlite3 5import logging 6 7import pytest 8 9 10class Test: 11 12 @pytest.mark.parametrize('setup_teardown', [None], indirect=True) 13 def test(self, setup_teardown, device): 14 #return 15 check_list_file = os.path.join(device.resource_path, 'apl_check_list.json') 16 logging.info('reading {}'.format(check_list_file)) 17 json_data = json.load(open(check_list_file, 'r')) 18 whitelist_dict = {} 19 for data in json_data: 20 whitelist_dict.update({data.get('bundle&processName'): int(data.get('apl'))}) 21 assert whitelist_dict, 'check list is empty' 22 23 logging.info('exporting access_token.db') 24 device.hdc_file_recv('/data/service/el1/public/access_token/access_token.db') 25 device.hdc_file_recv('/data/service/el1/public/access_token/access_token.db-wal') 26 device.hdc_file_recv('/data/service/el1/public/access_token/access_token.db-shm') 27 db_file = os.path.join(device.report_path, 'access_token.db') 28 assert os.path.exists(db_file), '{} not exist'.format(db_file) 29 30 logging.info('exporting nativetoken.json') 31 SA_INFO_FILE = "/data/service/el0/access_token/nativetoken.json" 32 device.hdc_file_recv(SA_INFO_FILE) 33 sa_info_file = os.path.join(device.report_path, 'nativetoken.json') 34 assert os.path.exists(sa_info_file), '{} not exist'.format(sa_info_file) 35 36 logging.info('querying from hap_token_info_table') 37 hap_apl_result = self.query_records(db_file, 'select bundle_name,apl from hap_token_info_table') 38 assert hap_apl_result, 'hap_token_info_table is empty' 39 logging.info('querying native_token_info_table') 40 native_apl_result = self.get_sa_info(sa_info_file) 41 assert native_apl_result, 'native_token_info_table is empty' 42 43 logging.info('hap apl checking') 44 hap_check_rst = self.compare_db_with_whitelist(hap_apl_result, whitelist_dict, 1) 45 logging.info('native apl checking') 46 native_check_rst = self.compare_db_with_whitelist(native_apl_result, whitelist_dict, 2) 47 assert hap_check_rst, 'hap apl check failed' 48 assert native_check_rst, 'native apl check failed' 49 50 @staticmethod 51 def get_sa_info(sa_info_file): 52 result_dict = {} 53 with open(sa_info_file, 'r') as file: 54 data = json.load(file) 55 for item in data: 56 processName = item.get('processName') 57 APL = item.get('APL') 58 result_dict.update({processName: APL}) 59 return result_dict 60 61 @staticmethod 62 def query_records(db_file, sql): 63 conn = sqlite3.connect(db_file) 64 assert conn, 'sqlit database connect failed' 65 cursor = conn.cursor() 66 cursor.execute(sql) 67 results = cursor.fetchall() 68 conn.close() 69 if not results: 70 return 71 result_dict = {} 72 for line in results: 73 key = line[0] 74 value = 0 if math.isnan(line[1]) else line[1] 75 result_dict.update({key: value}) 76 return result_dict 77 78 @staticmethod 79 def compare_db_with_whitelist(db_data: dict, whitelist_dict: dict, basic_value): 80 """ 81 数据库和白名单对比 82 :param db_data: 83 :param whitelist_dict: 84 :param basic_value: 85 :return: 86 """ 87 check_rst = True 88 for key, apl in db_data.items(): 89 if key not in whitelist_dict.keys(): 90 continue 91 if apl <= basic_value: 92 continue 93 is_pass = whitelist_dict[key] == apl 94 if not is_pass: 95 logging.info('bundleName/processName = {} apl = {} | check failed'.format(key, apl)) 96 check_rst = False 97 else: 98 logging.info('bundleName/processName = {} apl = {} | check pass'.format(key, apl)) 99 return check_rst 100