• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2021 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import time
21import platform
22import subprocess
23
24##############################################################################
25##############################################################################
26
27__all__ = ["DeviceAdapter", "HDCDeviceAdapter"]
28
29if platform.system() != 'Windows':
30    QUOTATION_MARKS = "'"
31else:
32    QUOTATION_MARKS = "\""
33USB_TOOLS = "hdc"
34HDC_TOOLS = "hdc"
35
36
37##############################################################################
38##############################################################################
39
40
41def get_package_name(hap_filepath):
42    package_name = ""
43
44    if os.path.exists(hap_filepath):
45        filename = os.path.basename(hap_filepath)
46
47        # unzip the hap file
48        hap_bak_path = os.path.abspath(os.path.join(
49            os.path.dirname(hap_filepath),
50            "%s.bak" % filename))
51        zf_desc = zipfile.ZipFile(hap_filepath)
52        try:
53            zf_desc.extractall(path=hap_bak_path)
54        except RuntimeError as error:
55            print(error)
56        zf_desc.close()
57
58        # verify config.json file
59        app_profile_path = os.path.join(hap_bak_path, "config.json")
60        if os.path.isfile(app_profile_path):
61            load_dict = {}
62            with open(app_profile_path, 'r') as json_file:
63                load_dict = json.load(json_file)
64            profile_list = load_dict.values()
65            for profile in profile_list:
66                package_name = profile.get("package")
67                if not package_name:
68                    continue
69                break
70
71        # delete hap_bak_path
72        if os.path.exists(hap_bak_path):
73            shutil.rmtree(hap_bak_path)
74    else:
75        print("file %s not exists" % hap_filepath)
76
77    return package_name
78
79
80##############################################################################
81##############################################################################
82
83
84class DeviceAdapter:
85    def __init__(self, remote_ip="", repote_port="", device_sn="", name=""):
86        self.device_sn = device_sn
87        self.name = name
88        self.test_path = "/%s/%s" % ("data", "test")
89        self.device_para = self.get_device_para(
90            remote_ip,
91            repote_port,
92            device_sn)
93        self.init_device()
94
95    ###############################################################
96    ###############################################################
97
98    def init_device(self):
99        self.remount()
100        self.shell('rm -rf %s' % self.test_path)
101        self.shell('mkdir -p %s' % self.test_path)
102        self.shell('chmod 777 %s' % self.test_path)
103        self.shell("mount -o rw,remount,rw /%s" % "system")
104
105    def remount(self):
106        command = "%s %s remount" % (USB_TOOLS, self.device_para)
107        self.execute_command(command)
108
109    def push_file(self, srcpath, despath):
110        command = "%s %s push %s %s" % (
111            USB_TOOLS,
112            self.device_para,
113            srcpath,
114            despath)
115        return self.execute_command(command)
116
117    def pull_file(self, srcpath, despath):
118        command = "%s %s pull %s %s" % (
119            USB_TOOLS,
120            self.device_para,
121            srcpath,
122            despath)
123        return self.execute_command(command)
124
125    def unlock_screen(self):
126        self.shell("svc power stayon true")
127
128    def unlock_device(self):
129        self.shell("input keyevent 82")
130        self.shell("wm dismiss-keyguard")
131
132    def lock_screen(self):
133        self.shell("svc power stayon false")
134
135    def disable_keyguard(self):
136        self.unlock_screen()
137        self.unlock_device()
138
139    def install_hap(self, suite_file):
140        file_name = os.path.basename(suite_file)
141        message = self.shell_with_output("bm install -p %s" % os.path.join(
142            self.test_path, file_name))
143        message = str(message).rstrip()
144        if message != "":
145            print(message)
146        if message == "" or "Success" in message:
147            return_code = True
148        else:
149            return_code = False
150        time.sleep(1)
151        return return_code
152
153    def uninstall_hap(self, suite_file):
154        package_name = get_package_name(suite_file)
155        result = self.shell("pm uninstall %s" % package_name)
156        time.sleep(1)
157        return result
158
159    def install_app(self, file_path):
160        command = "%s %s install %s" % (
161            USB_TOOLS,
162            self.device_para,
163            file_path)
164        message = self.execute_command(command)
165        message = str(message).rstrip()
166        if message != "":
167            print(message)
168        if message == "" or "Success" in message:
169            return_code = True
170        else:
171            return_code = False
172        time.sleep(1)
173        return return_code
174
175    def uninstall_app(self, package_name):
176        command = "pm uninstall %s" % (package_name)
177        return_code = self.shell_with_output(command)
178        if return_code:
179            time.sleep(1)
180        return return_code
181
182
183    ###############################################################
184    ###############################################################
185
186    @classmethod
187    def get_device_para(cls, remote_ip="", remote_port="",
188                          device_sn=""):
189        if "" == remote_ip or "" == remote_port:
190            if "" == device_sn:
191                device_para = ""
192            else:
193                device_para = "-s %s" % device_sn
194        else:
195            if "" == device_sn:
196                device_para = "-H %s -P %s" % (remote_ip, remote_port)
197            else:
198                device_para = "-H %s -P %s -s %s" % (
199                    remote_ip, remote_port, device_sn)
200        return device_para
201
202    def execute_command(self, command, print_flag=True, timeout=900):
203        try:
204            if print_flag:
205                print("command: " + command)
206            if subprocess.call(command, shell=True, timeout=timeout) == 0:
207                print("results: successed")
208                return True
209        except Exception as error:
210            print("Exception: %s" % str(error))
211        print("results: failed")
212        return False
213
214    def execute_command_with_output(self, command, print_flag=True):
215        if print_flag:
216            print("command: " + command)
217
218        proc = subprocess.Popen(command,
219            stdout=subprocess.PIPE,
220            stderr=subprocess.PIPE,
221            shell=True)
222
223        try:
224            data, _ = proc.communicate()
225            if isinstance(data, bytes):
226                data = data.decode('utf-8', 'ignore')
227        finally:
228            proc.stdout.close()
229            proc.stderr.close()
230        return data
231
232    def shell(self, command=""):
233        return self.execute_command("%s %s shell %s%s%s" % (
234            USB_TOOLS,
235            self.device_para,
236            QUOTATION_MARKS,
237            command,
238            QUOTATION_MARKS))
239
240    def execute_shell_command(self, command):
241        return self.shell(command)
242
243    def shell_with_output(self, command=""):
244        return self.execute_command_with_output("%s %s shell %s%s%s" % (
245            USB_TOOLS,
246            self.device_para,
247            QUOTATION_MARKS,
248            command,
249            QUOTATION_MARKS))
250
251    def check_path_legal(self, path):
252        if path and " " in path:
253            return "\"%s\"" % path
254        return path
255
256    def is_file_exist(self, file_path):
257        file_path = self.check_path_legal(file_path)
258        message = self.shell_with_output("ls %s" % file_path)
259        return False if message == "" else True
260
261
262##############################################################################
263##############################################################################
264
265
266class HDCDeviceAdapter:
267    def __init__(self, remote_ip="", repote_port="", device_sn="", name=""):
268        self.device_sn = device_sn
269        self.name = name
270        self.test_path = "/%s/%s/" % ("data", "test")
271        self.device_para = self.get_device_para(
272            remote_ip,
273            repote_port,
274            device_sn)
275        self.init_device()
276
277    ###############################################################
278    ###############################################################
279
280    def init_device(self):
281        self.remount()
282        self.shell('rm -rf %s' % self.test_path)
283        self.shell('mkdir -p %s' % self.test_path)
284        self.shell('chmod 777 %s' % self.test_path)
285        self.shell("mount -o rw,remount,rw /%s" % "system")
286
287    def remount(self):
288        command = "%s %s target mount" % (HDC_TOOLS)
289        self.execute_command(command)
290
291    def push_file(self, srcpath, despath):
292        command = "%s %s file send %s %s" % (
293            HDC_TOOLS,
294            self.device_para,
295            srcpath,
296            despath)
297        return self.execute_command(command)
298
299    def pull_file(self, srcpath, despath):
300        command = "%s %s file recv %s %s" % (
301            HDC_TOOLS,
302            self.device_para,
303            srcpath,
304            despath)
305        return self.execute_command(command)
306
307    def unlock_screen(self):
308        self.shell("svc power stayon true")
309
310    def unlock_device(self):
311        self.shell("input keyevent 82")
312        self.shell("wm dismiss-keyguard")
313
314    def lock_screen(self):
315        self.shell("svc power stayon false")
316
317    def disable_keyguard(self):
318        self.unlock_screen()
319        self.unlock_device()
320
321
322    ###############################################################
323    ###############################################################
324
325    @classmethod
326    def get_device_para(cls, remote_ip="", remote_port="",
327                          device_sn=""):
328        if "" == remote_ip or "" == remote_port:
329            if "" == device_sn:
330                device_para = ""
331            else:
332                device_para = "-t %s" % device_sn
333        else:
334            if "" == device_sn:
335                device_para = "-s tcp:%s:%s" % (remote_ip, remote_port)
336            else:
337                device_para = "-s tcp:%s:%s -t %s" % (
338                    remote_ip, remote_port, device_sn)
339        return device_para
340
341    def execute_command(self, command, print_flag=True, timeout=900):
342        try:
343            if print_flag:
344                print("command: " + command)
345            if subprocess.call(command, shell=True, timeout=timeout) == 0:
346                print("results: successed")
347                return True
348        except Exception as error:
349            print("Exception: %s" % str(error))
350        print("results: failed")
351        return False
352
353    def execute_command_with_output(self, command, print_flag=True):
354        if print_flag:
355            print("command: " + command)
356
357        proc = subprocess.Popen(command,
358            stdout=subprocess.PIPE,
359            stderr=subprocess.PIPE,
360            shell=True)
361
362        try:
363            data, _ = proc.communicate()
364            if isinstance(data, bytes):
365                data = data.decode('utf-8', 'ignore')
366        finally:
367            proc.stdout.close()
368            proc.stderr.close()
369        return data
370
371    def shell(self, command=""):
372        return self.execute_command("%s %s shell %s%s%s" % (
373            HDC_TOOLS,
374            self.device_para,
375            QUOTATION_MARKS,
376            command,
377            QUOTATION_MARKS))
378
379    def shell_with_output(self, command=""):
380        return self.execute_command_with_output("%s %s shell %s%s%s" % (
381            HDC_TOOLS,
382            self.device_para,
383            QUOTATION_MARKS,
384            command,
385            QUOTATION_MARKS))
386
387    def check_path_legal(self, path):
388        if path and " " in path:
389            return "\"%s\"" % path
390        return path
391
392    def is_file_exist(self, file_path):
393        file_path = self.check_path_legal(file_path)
394        message = self.shell_with_output("ls %s" % file_path)
395        return False if message == "" else True
396
397
398##############################################################################
399##############################################################################
400
401