• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2021 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import time
20from threading import Condition
21
22from xdevice_extension._core.environment.device_state import TestDeviceState
23from xdevice_extension._core.utils import convert_serial
24
25CHECK_POLL_TIME = 3
26MAX_CHECK_POLL_TIME = 30
27
28
29class DeviceStateListener(object):
30    def __init__(self, expected_state):
31        self.expected_state = expected_state
32        self.condition = Condition()
33
34    def state_changed(self, new_state):
35        if self.expected_state == new_state:
36            with self.condition:
37                self.condition.notify_all()
38
39    def get_expected_state(self):
40        return self.expected_state
41
42
43class DeviceStateMonitor(object):
44    """
45    Provides facilities for monitoring the state of a Device.
46    """
47
48    def __init__(self, device):
49        self.state_listener = []
50        self.device_state = device.test_device_state
51        self.device = device
52        self.default_online_timeout = 1 * 60 * 1000
53        self.default_available_timeout = 6 * 60 * 1000
54
55    def wait_for_device_state(self, state, wait_time):
56        listener = DeviceStateListener(state)
57        if self.device_state == state:
58            return True
59        self.device.log.debug(
60            "wait device %s for %s" % (convert_serial(self.device.device_sn),
61                                       state))
62
63        self.add_device_state_listener(listener)
64        with listener.condition:
65            try:
66                listener.condition.wait(wait_time / 1000)
67            finally:
68                self.remove_device_state_listener(listener)
69
70        return self.device_state == state
71
72    def wait_for_device_not_available(self, wait_time):
73        return self.wait_for_device_state(TestDeviceState.NOT_AVAILABLE,
74                                          wait_time)
75
76    def wait_for_device_online(self, wait_time=None):
77        if not wait_time:
78            wait_time = self.default_online_timeout
79        return self.wait_for_device_state(TestDeviceState.ONLINE, wait_time)
80
81    def wait_for_boot_complete(self, wait_time):
82        counter = 1
83        start_time = int(time.time()*1000)
84        self.device.log.debug("wait for boot complete, and wait time: %s ms" %
85                              wait_time)
86        while int(time.time()*1000) - start_time < wait_time:
87            try:
88                result = self.device.get_recover_result(retry=0)
89                if "1" in result:
90                    return True
91            except Exception as exception:
92                self.device.log.error("wait for boot complete exception: %s"
93                                      % exception)
94            time.sleep(min(CHECK_POLL_TIME * counter, MAX_CHECK_POLL_TIME))
95            counter = counter + 1
96        return False
97
98    def wait_for_device_available(self, wait_time=None):
99        if not wait_time:
100            wait_time = self.default_available_timeout
101        start_time = int(time.time()*1000)
102        if not self.wait_for_device_online(wait_time):
103            return False
104        elapsed_time = int(time.time()*1000) - start_time
105        if not self.wait_for_boot_complete(wait_time - elapsed_time):
106            return False
107        return True
108
109    def remove_device_state_listener(self, listener):
110        self.state_listener.remove(listener)
111
112    def add_device_state_listener(self, listener):
113        self.state_listener.append(listener)
114
115    def set_state(self, new_state):
116        if not new_state:
117            return
118        self.device_state = new_state
119        for listener in self.state_listener:
120            listener.state_changed(new_state)
121