• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import time
20from threading import Condition
21
22from _core.environment.device_state import TestDeviceState
23from _core.utils import convert_serial
24
25CHECK_POLL_TIME = 3
26MAX_CHECK_POLL_TIME = 30
27
28
29class DeviceStateListener(object):
30    def __init__(self, expected_state):
31        self.expected_state = expected_state
32        self.condition = Condition()
33
34    def state_changed(self, new_state):
35        if self.expected_state == new_state:
36            with self.condition:
37                self.condition.notify_all()
38
39    def get_expected_state(self):
40        return self.expected_state
41
42
43class DeviceStateMonitor(object):
44    """
45    Provides facilities for monitoring the state of a Device.
46    """
47
48    def __init__(self, device):
49        self.state_listener = []
50        self.device_state = device.test_device_state
51        self.device = device
52        self.default_online_timeout = 1 * 60 * 1000
53        self.default_available_timeout = 6 * 60 * 1000
54
55    def wait_for_device_state(self, state, wait_time):
56        listener = DeviceStateListener(state)
57        if self.device_state == state:
58            return True
59        self.device.log.debug(
60            "wait device %s for %s" % (convert_serial(self.device.device_sn),
61                                       state))
62
63        self.add_device_state_listener(listener)
64        with listener.condition:
65            try:
66                listener.condition.wait(wait_time / 1000)
67            finally:
68                self.remove_device_state_listener(listener)
69
70        return self.device_state == state
71
72    def wait_for_device_not_available(self, wait_time):
73        return self.wait_for_device_state(TestDeviceState.NOT_AVAILABLE,
74                                          wait_time)
75
76    def wait_for_device_online(self, wait_time=None):
77        if not wait_time:
78            wait_time = self.default_online_timeout
79        return self.wait_for_device_state(TestDeviceState.ONLINE, wait_time)
80
81    def wait_for_boot_complete(self, wait_time):
82        counter = 1
83        start_time = int(time.time() * 1000)
84        self.device.log.debug("wait for boot complete, and wait time: %s ms" %
85                              wait_time)
86        while int(time.time() * 1000) - start_time < wait_time:
87            try:
88                result = self.device.get_recover_result(retry=0)
89                if self.device.check_recover_result(result):
90                    time.sleep(6)
91                    return True
92            except Exception as exception:
93                self.device.log.error("wait for boot complete exception: %s"
94                                      % exception)
95            time.sleep(min(CHECK_POLL_TIME * counter, MAX_CHECK_POLL_TIME))
96            counter = counter + 1
97        return False
98
99    def wait_for_device_available(self, wait_time=None):
100        if not wait_time:
101            wait_time = self.default_available_timeout
102        start_time = int(time.time() * 1000)
103        if not self.wait_for_device_online(wait_time):
104            return False
105        elapsed_time = int(time.time() * 1000) - start_time
106        if not self.wait_for_boot_complete(wait_time - elapsed_time):
107            return False
108        return True
109
110    def remove_device_state_listener(self, listener):
111        self.state_listener.remove(listener)
112
113    def add_device_state_listener(self, listener):
114        self.state_listener.append(listener)
115
116    def set_state(self, new_state):
117        if not new_state:
118            return
119        self.device_state = new_state
120        for listener in self.state_listener:
121            listener.state_changed(new_state)
122