• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import codecs
20import os
21import shutil
22import stat
23import sys
24import time
25
26from devicetest.core.error_message import ErrorMessage
27from devicetest.core.exception import DeviceTestError
28from devicetest.core.variables import get_project_path
29from devicetest.log.logger import DeviceTestLog as log
30
31
32def get_template_path(template_file_path, isdir=None):
33    '''
34    @summary: Obtains the absolute path of the template screen cap path.
35    @param isdir: Obtain the directory: True; Obtain the file: False;
36                  None: Ignore the file type
37    '''
38    template_file_path = template_file_path.replace("\\", "/")
39    if os.path.isabs(template_file_path) \
40            and (not isdir and os.path.isfile(template_file_path)):
41        return os.path.abspath(template_file_path)
42
43    # remove first str '/'
44    if not os.path.isfile(template_file_path) and template_file_path.startswith("/"):
45        template_file_path = template_file_path[1:]
46
47    _fol = None
48    if template_file_path.startswith("resource"):
49        path = template_file_path[9:]
50        from xdevice import EnvPool
51        if EnvPool.resource_path is not None:
52            folder = os.path.abspath(EnvPool.resource_path)
53            _fol = travesal_folder(folder, path, isdir)
54            if _fol is None:
55                log.debug("Not found [%s] in env pool path %s, "
56                          "continue to find template in resource path." % (
57                              path, folder))
58        if _fol is None:
59            ecotest_resource_path = getattr(sys, "ecotest_resource_path", "")
60            if ecotest_resource_path is not None:
61                folder = os.path.abspath(ecotest_resource_path)
62                _fol = travesal_folder(folder, path, isdir)
63                if _fol is None:
64                    log.debug("Not found [%s] in resource path %s, "
65                              "continue to find template in other path." % (
66                                  path, folder))
67    else:
68        _fol = get_resource_path(template_file_path)
69    log.debug("get template path:{}".format(_fol))
70    return _fol
71
72
73def get_resource_path(resource_file_path, isdir=None):
74    '''
75    @summary: Obtains the absolute path of the resource file.
76    @param isdir: Obtain the directory: True; Obtain the file: False;
77                  None: Ignore the file type
78    '''
79    resource_file_path = resource_file_path.replace("\\", "/")
80    if os.path.isabs(resource_file_path) \
81            and ((isdir is None and os.path.exists(resource_file_path))
82                 or (not isdir and os.path.isfile(resource_file_path))
83                 or (isdir and os.path.isdir(resource_file_path))):
84        return os.path.abspath(resource_file_path)
85
86    _fol = None
87    from xdevice import EnvPool
88    if EnvPool.resource_path is not None:
89        folder = os.path.abspath(EnvPool.resource_path)
90        _fol = travesal_folder(folder, resource_file_path, isdir)
91        if _fol is None:
92            log.debug("Not found [%s] in env pool path %s, "
93                      "continue to find in project resource path." % (
94                          resource_file_path, folder))
95
96    if _fol is None:
97        ecotest_resource_path = getattr(sys, "ecotest_resource_path", "")
98        if ecotest_resource_path is not None:
99            folder = os.path.abspath(ecotest_resource_path)
100            _fol = travesal_folder(folder, resource_file_path, isdir)
101            if _fol is None:
102                log.debug("Not found [%s] in ecotest path %s, "
103                          "continue to find in suit path." % (
104                              resource_file_path, folder))
105
106    from devicetest.core.variables import DeccVariable
107    if _fol is None:
108        folder = os.path.abspath(DeccVariable.project.resource_path)
109        _fol = travesal_folder(folder, resource_file_path, isdir)
110        if _fol is None:
111            log.debug("Not found [%s] in product path %s, "
112                      "continue to find in project resource path." % (
113                          resource_file_path, folder))
114
115    if _fol is None:
116        folder = os.path.abspath(DeccVariable.project.test_suite_path)
117        _fol = travesal_folder(folder, resource_file_path, isdir)
118        if _fol is None:
119            log.debug("Not found [%s] in product path %s, "
120                      "continue to find in suit resource path." % (
121                          resource_file_path, folder))
122
123    if _fol is None:
124        folder = os.path.abspath(get_project_path())
125        _fol = travesal_folder(folder, resource_file_path, isdir)
126        if _fol is None:
127            log.debug("Not found [%s] in product path %s, "
128                      "continue to find in project path." % (
129                          resource_file_path, folder))
130
131    if _fol is None:
132        log.error(ErrorMessage.Error_01102.Message.en,
133                  error_no=ErrorMessage.Error_01102.Code)
134        raise DeviceTestError(ErrorMessage.Error_01102.Topic)
135    log.debug("get resource path:{}".format(_fol))
136    return _fol
137
138
139def travesal_folder(folder, folder_file_path, isdir=False):
140    folder_file = os.path.join(folder, folder_file_path)
141    if (isdir is None and os.path.exists(folder_file)) \
142            or (not isdir and os.path.isfile(folder_file)) \
143            or (isdir and os.path.isdir(folder_file)):
144        return os.path.abspath(folder_file)
145
146    if not os.path.exists(folder):
147        return None
148
149    for child in os.listdir(folder):
150        if child == ".svn":
151            continue
152
153        folder_file = os.path.join(folder, child)
154        if os.path.isdir(folder_file):
155            if (isdir is None or isdir) \
156                    and folder_file.endswith(os.sep + folder_file_path):
157                return folder_file
158            else:
159                folder_ret = travesal_folder(folder_file,
160                                             folder_file_path, isdir)
161                if folder_ret is not None:
162                    return folder_ret
163        elif os.path.isfile(folder_file) \
164                and folder_file.endswith(os.sep + folder_file_path) \
165                and (isdir is None or not isdir):
166            return folder_file
167
168    return None
169
170
171def os_open_file_write(file_path, content, mode="w"):
172    try:
173        flags = os.O_WRONLY | os.O_CREAT
174        modes = stat.S_IWUSR | stat.S_IRUSR
175        dir_path = os.path.dirname(file_path)
176        if not os.path.isdir(dir_path):
177            os.makedirs(dir_path)
178        with os.fdopen(os.open(file_path, flags, modes), mode) as fout:
179            fout.write(content)
180
181    except Exception as error:
182        log.error(ErrorMessage.Error_01214.Message.en,
183                  error_no=ErrorMessage.Error_01214.Code,
184                  is_traceback=True)
185        raise DeviceTestError(ErrorMessage.Error_01214.Topic) from error
186
187
188def os_open_file_read(file_path, mode="r"):
189    try:
190        flags = os.O_RDONLY
191        modes = stat.S_IWUSR | stat.S_IRUSR
192        with os.fdopen(os.open(file_path, flags, modes), mode) as fout:
193            return fout.read()
194
195    except FileNotFoundError as error:
196        log.error(ErrorMessage.Error_01216.Message.en,
197                  error_no=ErrorMessage.Error_01216.Code,
198                  is_traceback=True)
199        raise DeviceTestError(ErrorMessage.Error_01216.Topic) from error
200
201    except Exception as error:
202        log.error(ErrorMessage.Error_01215.Message.en,
203                  error_no=ErrorMessage.Error_01215.Code,
204                  is_traceback=True)
205        raise DeviceTestError(ErrorMessage.Error_01215.Topic) from error
206
207
208def save_file(file_path, content):
209    os_open_file_write(file_path, content, "wb")
210
211
212def create_dir(create_path):
213    """
214    Creates a directory if it does not exist already.
215    Args:
216        create_path: The path of the directory to create.
217    """
218    full_path = os.path.abspath(os.path.expanduser(create_path))
219    if not os.path.exists(full_path):
220        os.makedirs(full_path, exist_ok=True)  # exist_ok=True
221
222
223def to_file_plus(file_path, content, console=False, level="INFO"):
224    """
225    @summary: Create file, append "content" to file and add timestamp.
226    """
227    dirname = os.path.dirname(file_path)
228    if not os.path.exists(dirname):
229        # When the to_file_plus method is invoked in a thread, there is a
230        # possibility that the method does not exist when os.path.exists is
231        # executed. However, the directory has been created by the main thread
232        # during directory creation. Therefore, an exception is captured.
233        try:
234            os.makedirs(dirname)
235        except Exception as exception:
236            log.error(exception)
237    from devicetest.log.logger import get_log_line_timestamp
238    timestamp = get_log_line_timestamp()
239    data = "%s %s %s\n" % (timestamp, level, content)
240    if console:
241        print(data[:-2])
242    srw = codecs.open(file_path, "a", "utf-8")
243    srw.write(data)
244    srw.close()
245
246
247def to_file(filename, content):
248    '''
249    genenrate files
250    '''
251    dirname = os.path.dirname(filename)
252    if not os.path.isdir(dirname):
253        os.makedirs(dirname)
254    os_open_file_write(filename, content, "wb")
255
256
257def delfile(filename):
258    try:
259        os.remove(filename)
260    except Exception as exception:
261        log.error(exception)
262        if os.path.isfile(filename):
263            if "nt" in sys.builtin_module_names:
264                os.remove(filename)
265            else:
266                shutil.rmtree(filename)
267
268    for _ in range(5):
269        if os.path.isfile(filename):
270            time.sleep(0.1)
271            continue
272        else:
273            break
274
275    if os.path.isfile(filename):
276        log.error("Delete file %s failed." % filename)
277
278
279def delfolder(dirname):
280    try:
281        shutil.rmtree(dirname)
282    except Exception as _:
283        if os.path.isdir(dirname):
284            shutil.rmtree(dirname)
285
286    for _ in range(5):
287        if os.path.isdir(dirname):
288            time.sleep(0.1)
289            continue
290        else:
291            break
292
293    if os.path.isdir(dirname):
294        log.error("Delete folder %s failed." % dirname)
295
296
297def copy_to_folder(src, des):
298    """Copy a folder and its children or a file to another folder.
299    """
300    src = os.path.normpath(src)
301    des = os.path.normpath(des)
302    if not os.path.exists(src):
303        log.error("No found [%s]" % src)
304        return
305    if not os.path.exists(des):
306        create_dir(des)
307    if not os.path.isdir(des):
308        log.error("[%s] is not a folder." % des)
309        return
310
311    if not os.path.isdir(src):
312        shutil.copy(src, des)
313        return
314    os.chdir(src)
315    src_file_list = [os.path.join(src, src_file)
316                     for src_file in os.listdir(des)]
317    for source in src_file_list:
318        if os.path.isfile(source):
319            shutil.copy(source, des)
320        if os.path.isdir(source):
321            _, src_name = os.path.split(source)
322            shutil.copytree(source, os.path.join(des, src_name))
323
324
325def delete_file_folder(src):
326    '''
327    @summary: Delete files or directories.
328    '''
329
330    if os.path.isfile(src):
331        delfile(src)
332    elif os.path.isdir(src):
333        delfolder(src)
334