• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2016 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from enum import Enum
18from time import sleep
19
20from acts.controllers.relay_lib.errors import RelayConfigError
21
22
23class RelayState(Enum):
24    """Enum for possible Relay States."""
25    # Pretend this means 'OFF'
26    NO = 'NORMALLY_OPEN'
27    # Pretend this means 'ON'
28    NC = 'NORMALLY_CLOSED'
29
30
31class SynchronizeRelays:
32    """A class that allows for relays to change state nearly simultaneously.
33
34    Can be used with the 'with' statement in Python:
35
36    with SynchronizeRelays():
37        relay1.set_no()
38        relay2.set_nc()
39
40    Note that the thread will still wait for RELAY_TRANSITION_WAIT_TIME
41    after execution leaves the 'with' statement.
42    """
43    _sync_sleep_flag = False
44
45    def __enter__(self):
46        self.prev_toggle_time = Relay.transition_wait_time
47        self.prev_sync_flag = SynchronizeRelays._sync_sleep_flag
48        Relay.transition_wait_time = 0
49        SynchronizeRelays._sync_sleep_flag = False
50
51    def __exit__(self, type, value, traceback):
52        if SynchronizeRelays._sync_sleep_flag:
53            sleep(Relay.transition_wait_time)
54
55        Relay.transition_wait_time = self.prev_toggle_time
56        SynchronizeRelays._sync_sleep_flag = self.prev_sync_flag
57
58
59class Relay(object):
60    """A class representing a single relay switch on a RelayBoard.
61
62    References to these relays are stored in both the RelayBoard and the
63    RelayDevice classes under the variable "relays". GenericRelayDevice can also
64    access these relays through the subscript ([]) operator.
65
66    At the moment, relays only have a valid state of 'ON' or 'OFF'. This may be
67    extended in a subclass if needed. Keep in mind that if this is done, changes
68    will also need to be made in the RelayRigParser class to initialize the
69    relays.
70
71    """
72    """How long to wait for relays to transition state."""
73    transition_wait_time = .2
74    button_press_time = .25
75
76    def __init__(self, relay_board, position):
77        self.relay_board = relay_board
78        self.position = position
79        self._original_state = None
80        self.relay_id = "%s/%s" % (self.relay_board.name, self.position)
81
82    def set_no(self):
83        """Sets the relay to the 'NO' state. Shorthand for set(RelayState.NO).
84
85        Blocks the thread for Relay.transition_wait_time.
86        """
87        self.set(RelayState.NO)
88
89    def set_nc(self):
90        """Sets the relay to the 'NC' state. Shorthand for set(RelayState.NC).
91
92        Blocks the thread for Relay.transition_wait_time.
93
94        """
95        self.set(RelayState.NC)
96
97    def toggle(self):
98        """Swaps the state from 'NO' to 'NC' or 'NC' to 'NO'.
99        Blocks the thread for Relay.transition_wait_time.
100        """
101        if self.get_status() == RelayState.NO:
102            self.set(RelayState.NC)
103        else:
104            self.set(RelayState.NO)
105
106    def set(self, state):
107        """Sets the relay to the 'NO' or 'NC' state.
108
109        Blocks the thread for Relay.transition_wait_time.
110
111        Args:
112            state: either 'NO' or 'NC'.
113
114        Raises:
115            ValueError if state is not 'NO' or 'NC'.
116
117        """
118        if self._original_state is None:
119            self._original_state = self.relay_board.get_relay_status(
120                self.position)
121
122        if state is not RelayState.NO and state is not RelayState.NC:
123            raise ValueError(
124                'Invalid state. Received "%s". Expected any of %s.' %
125                (state, [state for state in RelayState]))
126        if self.get_status() != state:
127            self.relay_board.set(self.position, state)
128            SynchronizeRelays._sync_sleep_flag = True
129            sleep(Relay.transition_wait_time)
130
131    def set_no_for(self, seconds=button_press_time):
132        """Sets the relay to 'NORMALLY_OPEN' for seconds. Blocks the thread.
133
134        Args:
135            seconds: The number of seconds to sleep for.
136        """
137        self.set_no()
138        sleep(seconds)
139        self.set_nc()
140
141    def set_nc_for(self, seconds=button_press_time):
142        """Sets the relay to 'NORMALLY_CLOSED' for seconds. Blocks the thread.
143
144        Respects Relay.transition_wait_time for toggling state.
145
146        Args:
147            seconds: The number of seconds to sleep for.
148        """
149        self.set_nc()
150        sleep(seconds)
151        self.set_no()
152
153    def get_status(self):
154        return self.relay_board.get_relay_status(self.position)
155
156    def clean_up(self):
157        """Does any clean up needed to allow the next series of tests to run.
158
159        For now, all this does is switches to its previous state. Inheriting
160        from this class and overriding this method would be the best course of
161        action to allow a more complex clean up to occur. If you do this, be
162        sure to make the necessary modifications in RelayRig.initialize_relay
163        and RelayRigParser.parse_json_relays.
164        """
165        if self._original_state is not None:
166            self.set(self._original_state)
167
168    def is_dirty(self):
169        return self._original_state is not None
170
171
172class RelayDict(object):
173    """A wrapped dictionary that gives config errors upon failure.
174
175    Has the same interface as a dictionary, but when getting the key fails, the
176    dictionary returns a RelayConfigError, letting the user know that the reason
177    the dict failed to return a relay is because the relay was not found in the
178    config.
179
180    Also prevents modification of elements, because changing the relays here
181    does not change what they are in hardware.
182    """
183    ERROR_MESSAGE = ('Error: Attempted to get relay "%s" in %s "%s" but the '
184                     'relay does not exist.\nExisting relays are: %s.\nMake '
185                     'sure the missing relay is added to the config file, and '
186                     'is properly setup.')
187
188    def __init__(self, relay_device, input_dict):
189        self.relay_device = relay_device
190        self._store = input_dict
191
192    def __getitem__(self, key):
193        try:
194            return self._store[key]
195        except KeyError:
196            raise RelayConfigError(self.ERROR_MESSAGE %
197                                   (key, type(self.relay_device),
198                                    self.relay_device.name, self._store))
199
200    def __iter__(self):
201        return iter(self._store)
202
203    def __len__(self):
204        return len(self._store)
205
206    def __repr__(self):
207        return repr(self._store)
208