• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2021 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18import json
19import os
20import shutil
21import platform
22import subprocess
23
24##############################################################################
25##############################################################################
26
27__all__ = ["DeviceShell"]
28
29import zipfile
30
31if platform.system() != 'Windows':
32    QUOTATION_MARKS = "'"
33else:
34    QUOTATION_MARKS = "\""
35
36HDC_TOOLS = "hdc"
37
38
39##############################################################################
40##############################################################################
41
42
43def get_package_name(hap_filepath):
44    package_name = ""
45
46    if os.path.exists(hap_filepath):
47        filename = os.path.basename(hap_filepath)
48
49        # unzip the hap file
50        hap_bak_path = os.path.abspath(os.path.join(
51            os.path.dirname(hap_filepath),
52            "%s.bak" % filename))
53        zf_desc = zipfile.ZipFile(hap_filepath)
54        try:
55            zf_desc.extractall(path=hap_bak_path)
56        except RuntimeError as error:
57            print("Unzip error: ", hap_bak_path)
58        zf_desc.close()
59
60        # verify config.json file
61        app_profile_path = os.path.join(hap_bak_path, "config.json")
62        if os.path.isfile(app_profile_path):
63            load_dict = {}
64            with open(app_profile_path, 'r') as json_file:
65                load_dict = json.load(json_file)
66            profile_list = load_dict.values()
67            for profile in profile_list:
68                package_name = profile.get("package")
69                if not package_name:
70                    continue
71                break
72
73        # delete hap_bak_path
74        if os.path.exists(hap_bak_path):
75            shutil.rmtree(hap_bak_path)
76    else:
77        print("file %s not exists" % hap_filepath)
78
79    return package_name
80
81##############################################################################
82##############################################################################
83
84
85class DeviceShell:
86    def __init__(self, conn_type, remote_ip="", device_sn="", repote_port="", name=""):
87        self.conn_type = conn_type
88        self.device_sn = device_sn
89        self.name = name
90        self.test_path = "data/test"
91        if conn_type:
92            self.device_params = self.get_device_hdc_para(
93                device_sn
94            )
95        else:
96            self.device_params = self.get_device_para(
97                remote_ip, repote_port, device_sn)
98        self.init_device()
99
100    @classmethod
101    def get_device_para(cls, remote_ip="", remote_port="",
102                        device_sn=""):
103        if "" == remote_ip or "" == remote_port:
104            if "" == device_sn:
105                device_para = ""
106            else:
107                device_para = "-s %s" % device_sn
108        else:
109            if "" == device_sn:
110                device_para = "-H %s -P %s" % (remote_ip, remote_port)
111            else:
112                device_para = "-H %s -P %s -t %s" % (
113                    remote_ip, remote_port, device_sn)
114        return device_para
115
116    @classmethod
117    def get_device_hdc_para(cls, device_sn=""):
118        if " " == device_sn:
119            device_para = ""
120        else:
121            device_para = "-t %s" % device_sn
122
123        return device_para
124
125    def remount(self):
126        if self.conn_type:
127            remount = "target mount"
128        else:
129            remount = "remount"
130        command = "%s %s %s" % (HDC_TOOLS, self.device_params, remount)
131        self.execute_command(command)
132
133    def init_device(self):
134        self.remount()
135        self.shell('rm -rf %s' % self.test_path)
136        self.shell('mkdir -p %s' % self.test_path)
137        self.shell('chmod 777 -R %s' % self.test_path)
138        self.shell("mount -o rw,remount,rw /%s" % "system")
139
140    def unlock_screen(self):
141        self.shell("svc power stayon true")
142
143    def unlock_device(self):
144        self.shell("input keyevent 82")
145        self.shell("wm dismiss-keyguard")
146
147    def push_file(self, srcpath, despath):
148        if self.conn_type:
149            push_args = "file send"
150        else:
151            push_args = "push"
152        command = "%s %s %s %s %s" % (
153            HDC_TOOLS,
154            self.device_params,
155            push_args,
156            srcpath,
157            despath)
158        return self.execute_command(command)
159
160    def pull_file(self, srcpath, despath):
161        if self.conn_type:
162            pull_args = "file recv"
163        else:
164            pull_args = "pull"
165        command = "%s %s %s %s %s" % (
166            HDC_TOOLS,
167            self.device_params,
168            pull_args,
169            srcpath,
170            despath)
171        return self.execute_command(command)
172
173    def lock_screen(self):
174        self.shell("svc power stayon false")
175
176    def disable_keyguard(self):
177        self.unlock_screen()
178        self.unlock_device()
179
180    def shell(self, command=""):
181        return self.execute_command("%s %s shell %s%s%s" % (
182            HDC_TOOLS,
183            self.device_params,
184            QUOTATION_MARKS,
185            command,
186            QUOTATION_MARKS))
187
188    @classmethod
189    def execute_command(cls, command, print_flag=True, timeout=900):
190        try:
191            if print_flag:
192                print("command: " + command)
193            if subprocess.call(command, shell=True, timeout=timeout) == 0:
194                print("results: successed")
195                return True
196        except Exception as error:
197            print("Exception: %s" % str(error))
198        print("results: failed")
199        return False
200
201    def shell_with_output(self, command=""):
202        return self.execute_command_with_output("%s %s shell %s%s%s" % (
203            HDC_TOOLS,
204            self.device_params,
205            QUOTATION_MARKS,
206            command,
207            QUOTATION_MARKS))
208
209    @classmethod
210    def execute_command_with_output(cls, command, print_flag=True):
211        if print_flag:
212            print("command: " + command)
213
214        proc = subprocess.Popen(command,
215                                stdout=subprocess.PIPE,
216                                stderr=subprocess.PIPE,
217                                shell=True)
218
219        result = ""
220        try:
221            data, _ = proc.communicate()
222            if isinstance(data, bytes):
223                result = data.decode('utf-8', 'ignore')
224        finally:
225            proc.stdout.close()
226            proc.stderr.close()
227        return result if result else data
228
229    @classmethod
230    def check_path_legal(cls, path):
231        if path and " " in path:
232            return "\"%s\"" % path
233        return path