• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# -*- coding: utf-8 -*-
2# Copyright (c) 2022 Huawei Device Co., Ltd.
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from ast import parse
16import json
17import sys
18import os
19import time
20import argparse
21import re
22import subprocess
23import shlex
24import datetime
25
26def GetDirSize(dir_path):
27    if  not os.path.exists(dir_path):
28        PrintToLog("\n\nERROR: %s, dir are not exist!!!\n" % dir_path)
29        PrintToLog("End of check, test failed!")
30        sys.exit(99)
31    size = 0
32    for root, dirs, files in os.walk(dir_path):
33        for name in files:
34            if not os.path.islink(os.path.join(root, name)):
35                sz = os.path.getsize(os.path.join(root, name))
36                #print('{} : {}byte'.format(os.path.join(root, name), sz))
37                size += sz
38    PrintToLog('total size: {:.2f}M'.format(size/1024/1024))
39    return size
40
41def PrintToLog(str):
42    time = datetime.datetime.now()
43    str = "[{}] {}".format(time, str)
44    print(str)
45    with open(os.path.join(args.save_path, 'L2_mini_test_{}.log'.format(args.device_num)), mode='a', encoding='utf-8') as log_file:
46        console = sys.stdout
47        sys.stdout = log_file
48        print(str)
49        sys.stdout = console
50    log_file.close()
51
52def EnterCmd(mycmd, waittime = 0, printresult = 1):
53    if mycmd == "":
54        return
55    global CmdRetryCnt
56    CmdRetryCnt = 1
57    EnterCmdRetry = 2
58    while EnterCmdRetry:
59        EnterCmdRetry -= 1
60        try:
61            p = subprocess.Popen(mycmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
62            result, unused_err = p.communicate(timeout=25)
63            try:
64                result=result.decode(encoding="utf-8")
65            except UnicodeDecodeError:
66                result=result.decode('gbk', errors='ignore')
67            break
68        except Exception as e:
69            result = 'retry failed again'
70            PrintToLog(e)
71            CmdRetryCnt += 1
72            p.kill()
73    if printresult == 1:
74        with open(os.path.join(args.save_path, 'mini_test_{}.bat'.format(args.device_num)), mode='a', encoding='utf-8') as cmd_file:
75            cmd_file.write(mycmd + '\n')
76        cmd_file.close()
77        PrintToLog(mycmd)
78        PrintToLog(result)
79        sys.stdout.flush()
80    if waittime != 0:
81        time.sleep(waittime)
82        if printresult == 1:
83            with open(os.path.join(args.save_path, 'mini_test_{}.bat'.format(args.device_num)), mode='a', encoding='utf-8') as cmd_file:
84                cmd_file.write("ping -n {} 127.0.0.1>null\n".format(waittime))
85            cmd_file.close()
86    return result
87
88def EnterShellCmd(shellcmd, waittime = 0, printresult = 1):
89    if shellcmd == "":
90        return
91    cmd = "hdc_std -t {} shell \"{}\"".format(args.device_num, shellcmd)
92    return EnterCmd(cmd, waittime, printresult)
93
94if __name__ == "__main__":
95    parser = argparse.ArgumentParser(description='manual to this script')
96    parser.add_argument('--save_path', type=str, default = 'D:\\DeviceTestTools\\screenshot')
97    parser.add_argument('--device_num', type=str, default = 'null')
98    parser.add_argument('--archive_path', type=str, default = 'Z:\workspace\ohos_L2\ohos\out\\rk3568\packages\phone')
99    args = parser.parse_args()
100
101    if args.device_num == 'null':
102        result = EnterCmd("hdc_std list targets", 1, 0)
103        print(result)
104        args.device_num = result.split()[0]
105
106    PrintToLog("\n\n########## First check key processes start ##############")
107    lose_process = []
108    process_pid = {}
109
110    two_check_process_list = ['huks_service', 'hilogd', 'hdf_devmgr', 'samgr', 'foundation', 'accesstoken_ser',]
111    other_process_list = ['softbus_server', 'deviceauth_service']
112
113    for pname in two_check_process_list:
114        pids = EnterCmd("hdc_std -t {} shell pidof {}".format(args.device_num, pname), 0, 1)
115        try:
116            pidlist = pids.split()
117            int(pidlist[0])
118            for pid in pidlist:
119                int(pid)
120            process_pid[pname] = pidlist
121        except:
122            lose_process.append(pname)
123    all_p = EnterShellCmd("ps -elf")
124    for pname in other_process_list:
125        findp = all_p.find(pname, 0, len(all_p))
126        if findp == -1:
127            lose_process.append(pname)
128
129    if lose_process:
130        PrintToLog("\n\nERROR: %s, These processes are not exist!!!\n" % lose_process)
131        PrintToLog("End of check, test failed!")
132        sys.exit(99)
133    else:
134        PrintToLog("First processes check is ok\n")
135
136    # check processes usage
137    res = EnterCmd("hdc_std -t {} shell hidumper --mem".format(args.device_num), 0, 1)
138    process_usage = int(res.split(':')[-1].split()[0]) / 1024
139    if process_usage > 40:
140        PrintToLog(
141            "ERROR: Processes usage cannot be greater than 40M, but currently it's actually %.2fM" % process_usage)
142        sys.exit(99)
143
144    time.sleep(10)
145
146    #key processes second check, and cmp to first check
147    PrintToLog("\n\n########## Second check key processes start ##############")
148    second_check_lose_process = []
149    #for pname in two_check_process_list + other_process_list:
150    for pname in two_check_process_list:
151        pids = EnterCmd("hdc_std -t {} shell pidof {}".format(args.device_num, pname), 0, 1)
152        try:
153            pidlist = pids.split()
154            if process_pid[pname] != pidlist:
155                if pname in two_check_process_list:
156                    PrintToLog("ERROR: pid of %s is different the first check" % pname)
157                    PrintToLog("SmokeTest find some fatal problems!")
158                    PrintToLog("End of check, test failed!")
159                    sys.exit(99)
160                else:
161                    PrintToLog("WARNNING: pid of %s is different the first check" % pname)
162            elif len(pidlist) != 1:
163                if pname in two_check_process_list:
164                    PrintToLog("ERROR: pid of %s is not only one" % pname)
165                    PrintToLog("SmokeTest find some fatal problems!")
166                    PrintToLog("End of check, test failed!")
167                    sys.exit(99)
168                else:
169                    PrintToLog("WARNNING: pid of %s is not only one" % pname)
170        except:
171            second_check_lose_process.append(pname)
172
173    if second_check_lose_process:
174        PrintToLog("ERROR: pid of %s is not exist" % pname)
175        PrintToLog("SmokeTest find some fatal problems!")
176        PrintToLog("End of check, test failed!")
177        sys.exit(99)
178    else:
179        PrintToLog("Second processes check is ok\n")
180
181    target_dir = os.path.normpath(os.path.join(args.archive_path, "system"))
182    PrintToLog(target_dir)
183    ret_size = GetDirSize(target_dir)/1024/1024
184    PrintToLog('Size of system is :{:.2f}M'.format(ret_size))
185    if ret_size > 50:
186        PrintToLog('ERROR: Size of system({:.2f}M) is over the upper limit(50M)'.format(ret_size))
187        PrintToLog("End of check, test failed!")
188        sys.exit(99)
189
190    target_dir = os.path.normpath(os.path.join(args.archive_path, "data"))
191    ret_size = GetDirSize(target_dir)/1024/1024
192    PrintToLog('Size of data is :{:.2f}M'.format(ret_size))
193    if ret_size > 50:
194        PrintToLog('ERROR: Size of data({:.2f}M) is over the upper limit(50M)'.format(ret_size))
195        PrintToLog("End of check, test failed!")
196        sys.exit(99)
197
198    target_dir = os.path.normpath(os.path.join(args.archive_path, "updater"))
199    ret_size = GetDirSize(target_dir)/1024/1024
200    PrintToLog('Size of updater is :{:.2f}M'.format(ret_size))
201    if ret_size > 50:
202        PrintToLog('ERROR: Size of updater({:.2f}M) is over the upper limit(50M)'.format(ret_size))
203        PrintToLog("End of check, test failed!")
204        sys.exit(99)
205
206    target_dir = os.path.normpath(os.path.join(args.archive_path, "vendor"))
207    ret_size = GetDirSize(target_dir)/1024/1024
208    PrintToLog('Size of vendor is :{:.2f}M'.format(ret_size))
209    if ret_size > 50:
210        PrintToLog('ERROR: Size of vendor({:.2f}M) is over the upper limit(50M)'.format(ret_size))
211        PrintToLog("End of check, test failed!")
212        sys.exit(99)
213
214    PrintToLog("All testcase is ok")
215    PrintToLog("End of check, test succeeded!")
216    sys.exit(0)
217