• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import json
2import math
3import os.path
4import sqlite3
5import logging
6
7import pytest
8
9
10class Test:
11
12    @pytest.mark.parametrize('setup_teardown', [None], indirect=True)
13    def test(self, setup_teardown, device):
14        return
15        check_list_file = os.path.join(device.resource_path, 'apl_check_list.json')
16        logging.info('reading {}'.format(check_list_file))
17        json_data = json.load(open(check_list_file, 'r'))
18        whitelist_dict = {}
19        for data in json_data:
20            whitelist_dict.update({data.get('bundle&processName'): int(data.get('apl'))})
21        assert whitelist_dict, 'check list is empty'
22
23        logging.info('exporting access_token.db')
24        device.hdc_file_recv('/data/service/el1/public/access_token/access_token.db')
25        device.hdc_file_recv('/data/service/el1/public/access_token/access_token.db-wal')
26        device.hdc_file_recv('/data/service/el1/public/access_token/access_token.db-shm')
27        db_file = os.path.join(device.report_path, 'access_token.db')
28        assert os.path.exists(db_file), '{} not exist'.format(db_file)
29
30        logging.info('querying from hap_token_info_table')
31        hap_apl_result = self.query_records(db_file, 'select bundle_name,apl from hap_token_info_table')
32        assert hap_apl_result, 'hap_token_info_table is empty'
33        logging.info('querying native_token_info_table')
34        native_apl_result = self.query_records(db_file, 'select process_name,apl from native_token_info_table')
35        assert native_apl_result, 'native_token_info_table is empty'
36
37        logging.info('hap apl checking')
38        hap_check_rst = self.compare_db_with_whitelist(hap_apl_result, whitelist_dict, 1)
39        logging.info('native apl checking')
40        native_check_rst = self.compare_db_with_whitelist(native_apl_result, whitelist_dict, 2)
41        assert hap_check_rst, 'hap apl check failed'
42        assert native_check_rst, 'native apl check failed'
43
44    @staticmethod
45    def query_records(db_file, sql):
46        conn = sqlite3.connect(db_file)
47        assert conn, 'sqlit database connect failed'
48        cursor = conn.cursor()
49        cursor.execute(sql)
50        results = cursor.fetchall()
51        conn.close()
52        if not results:
53            return
54        result_dict = {}
55        for line in results:
56            key = line[0]
57            value = 0 if math.isnan(line[1]) else line[1]
58            result_dict.update({key: value})
59        return result_dict
60
61    @staticmethod
62    def compare_db_with_whitelist(db_data: dict, whitelist_dict: dict, basic_value):
63        """
64        数据库和白名单对比
65        :param db_data:
66        :param whitelist_dict:
67        :param basic_value:
68        :return:
69        """
70        check_rst = True
71        for key, apl in db_data.items():
72            if key not in whitelist_dict.keys():
73                continue
74            if apl <= basic_value:
75                continue
76            is_pass = whitelist_dict[key] == apl
77            if not is_pass:
78                logging.info('bundleName/processName = {} apl = {} | check failed'.format(key, apl))
79                check_rst = False
80            else:
81                logging.info('bundleName/processName = {} apl = {} | check pass'.format(key, apl))
82        return check_rst
83