1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# Copyright (C) 2024 Huawei Device Co., Ltd. 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16import os 17import pytest 18import sqlite3 19import subprocess 20import sys 21import threading 22sys.path.append("..") 23from tools.utils import SMALL_TRACE_EXPECTED_SIZE_3, MID_TRACE_EXPECTED_SIZE 24 25 26def task(): 27 subprocess.check_output(f'hdc shell "hiprofiler_cmd -c /data/local/tmp/config_diskio.txt -o /data/local/tmp/test_diskio.htrace -t 10 -s -k"') 28 29 30def task_ex(): 31 subprocess.check_output(f'hdc shell "hiprofiler_cmd -c /data/local/tmp/config_diskio_ex.txt -o /data/local/tmp/test_diskio_ex.htrace -t 10 -s -k"') 32 33 34def task_no_report(): 35 subprocess.check_output(f'hdc shell "hiprofiler_cmd -c /data/local/tmp/config_diskio_no_report.txt -o /data/local/tmp/test_diskio_no_report.htrace -t 10 -s -k"') 36 37 38class TestHiprofilerDiskioPlugin: 39 40 @pytest.mark.L0 41 def test_diskio_plugin(self): 42 subprocess.run(r'hdc file send ..\inputfiles\diskio\config_diskio.txt /data/local/tmp', 43 text=True, encoding="utf-8") 44 task_thread = threading.Thread(target=task, args=()) 45 task_thread.start() 46 task_thread.join() 47 subprocess.run(r'hdc file recv /data/local/tmp/test_diskio.htrace ..\outputfiles', 48 text=True, encoding="utf-8") 49 50 subprocess.run( 51 r'..\inputfiles\trace_streamer_db.exe ..\outputfiles\test_diskio.htrace -e ..\outputfiles\test_diskio.db') 52 53 # 连接数据库文件 54 conn = sqlite3.connect(r'..\outputfiles\test_diskio.db') 55 56 # # 创建游标对象 57 cursor = conn.cursor() 58 59 # # 执行SQL查询 60 check = False 61 db_relative_path = "..\\outputfiles\\test_diskio.htrace" 62 absolute_path = os.path.abspath(db_relative_path) 63 db_size = os.path.getsize(absolute_path) 64 print("liuwei db_size0 = ", db_size) 65 # # 判断数据库大小 66 assert db_size > MID_TRACE_EXPECTED_SIZE 67 table_list = [a for a in cursor.execute('SELECT name FROM sqlite_master WHERE type = "table"')] 68 cursor.execute('SELECT * FROM diskio') 69 result = cursor.fetchall() 70 if len(result): 71 check = True 72 assert check 73 # # 判断数据库中rd_count wr_count rd_count_speed rd_count_speed 数据 74 for row in result: 75 assert (row[6] > 0) 76 assert (row[7] > 0) 77 assert (row[8] > 0) 78 assert (row[9] > 0) 79 80 @pytest.mark.L0 81 def test_diskio_plugin_report_ex(self): 82 subprocess.run(r'hdc file send ..\inputfiles\diskio\config_diskio_ex.txt /data/local/tmp', 83 text=True, encoding="utf-8") 84 task_thread = threading.Thread(target=task_ex, args=()) 85 task_thread.start() 86 task_thread.join() 87 subprocess.run(r'hdc file recv /data/local/tmp/test_diskio_ex.htrace ..\outputfiles', 88 text=True, encoding="utf-8") 89 90 subprocess.run( 91 r'..\inputfiles\trace_streamer_db.exe ..\outputfiles\test_diskio_ex.htrace -e ..\outputfiles\test_diskio_ex.db') 92 93 # 连接数据库文件 94 conn = sqlite3.connect(r'..\outputfiles\test_diskio_ex.db') 95 96 # # 创建游标对象 97 cursor = conn.cursor() 98 99 # # 执行SQL查询 100 check = False 101 db_relative_path = "..\\outputfiles\\test_diskio_ex.htrace" 102 absolute_path = os.path.abspath(db_relative_path) 103 db_size = os.path.getsize(absolute_path) 104 # # 判断数据库大小 105 assert db_size > SMALL_TRACE_EXPECTED_SIZE_3 106 table_list = [a for a in cursor.execute('SELECT name FROM sqlite_master WHERE type = "table"')] 107 cursor.execute('SELECT * FROM diskio') 108 result = cursor.fetchall() 109 if len(result): 110 check = True 111 assert check 112 # # 判断数据库中rd_count wr_count rd_count_speed rd_count_speed 数据 113 for row in result: 114 assert (row[6] == 0) 115 assert (row[7] == 0) 116 assert (row[8] == 0) 117 assert (row[9] == 0) 118 119 @pytest.mark.L0 120 def test_diskio_plugin_no_report(self): 121 subprocess.run(r'hdc file send ..\inputfiles\diskio\config_diskio_no_report.txt /data/local/tmp', 122 text=True, encoding="utf-8") 123 task_thread = threading.Thread(target=task_no_report, args=()) 124 task_thread.start() 125 task_thread.join() 126 subprocess.run(r'hdc file recv /data/local/tmp/test_diskio_no_report.htrace ..\outputfiles', 127 text=True, encoding="utf-8") 128 129 subprocess.run( 130 r'..\inputfiles\trace_streamer_db.exe ..\outputfiles\test_diskio_no_report.htrace -e ..\outputfiles\test_diskio_no_report.db') 131 132 # 连接数据库文件 133 conn = sqlite3.connect(r'..\outputfiles\test_diskio_no_report.db') 134 135 # # 创建游标对象 136 cursor = conn.cursor() 137 138 # # 执行SQL查询 139 check = False 140 db_relative_path = "..\\outputfiles\\test_diskio_no_report.htrace" 141 absolute_path = os.path.abspath(db_relative_path) 142 db_size = os.path.getsize(absolute_path) 143 # # 判断数据库大小 144 assert db_size < SMALL_TRACE_EXPECTED_SIZE_3 145 table_list = [a for a in cursor.execute('SELECT name FROM sqlite_master WHERE type = "table"')] 146 cursor.execute('SELECT * FROM diskio') 147 result = cursor.fetchall() 148 if len(result): 149 check = True 150 assert check 151 # # 判断数据库中rd_count wr_count rd_count_speed rd_count_speed 数据 152 for row in result: 153 assert (row[6] == 0) 154 assert (row[7] == 0) 155 assert (row[8] == 0) 156 assert (row[9] == 0)