1#!/usr/bin/env python3 2# 3# Copyright 2016 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from future.moves.urllib.request import urlopen 18import re 19 20from acts.controllers.relay_lib.errors import RelayDeviceConnectionError 21from acts.controllers.relay_lib.helpers import validate_key 22from acts.controllers.relay_lib.relay import RelayState 23from acts.controllers.relay_lib.relay_board import RelayBoard 24 25BASE_URL = 'http://192.168.1.4/30000/' 26 27 28class SainSmartBoard(RelayBoard): 29 """Controls and queries SainSmart Web Relay Board. 30 31 Controls and queries SainSmart Web Relay Board, found here: 32 http://www.sainsmart.com/sainsmart-rj45-tcp-ip-remote-controller-board-with-8-channels-relay-integrated.html 33 this uses a web interface to toggle relays. 34 35 There is an unmentioned hidden status page that can be found at <root>/99/. 36 """ 37 38 # No longer used. Here for debugging purposes. 39 # 40 # Old status pages. Used before base_url/99 was found. 41 # STATUS_1 = '40' 42 # STATUS_2 = '43' 43 # 44 # This is the regex used to parse the old status pages: 45 # r'y-\d(?P<relay>\d).+?> (?: )?(?P<status>.*?)&' 46 # 47 # Pages that will turn all switches on or off, even the ghost switches. 48 # ALL_RELAY_OFF = '44' 49 # ALL_RELAY_ON = '45' 50 51 HIDDEN_STATUS_PAGE = '99' 52 53 VALID_RELAY_POSITIONS = [0, 1, 2, 3, 4, 5, 6, 7] 54 NUM_RELAYS = 8 55 56 def __init__(self, config): 57 # This will be lazy loaded 58 self.status_dict = None 59 self.base_url = validate_key('base_url', config, str, 'config') 60 if not self.base_url.endswith('/'): 61 self.base_url += '/' 62 super(SainSmartBoard, self).__init__(config) 63 64 def get_relay_position_list(self): 65 return self.VALID_RELAY_POSITIONS 66 67 def _load_page(self, relative_url): 68 """Loads a web page at self.base_url + relative_url. 69 70 Properly opens and closes the web page. 71 72 Args: 73 relative_url: The string appended to the base_url. 74 75 Returns: 76 the contents of the web page. 77 78 Raises: 79 A RelayDeviceConnectionError is raised if the page cannot be loaded. 80 81 """ 82 try: 83 page = urlopen(self.base_url + relative_url) 84 result = page.read().decode('utf-8') 85 page.close() 86 except IOError: 87 raise RelayDeviceConnectionError( 88 'Unable to connect to board "{}" through {}'.format( 89 self.name, self.base_url + relative_url)) 90 return result 91 92 def _sync_status_dict(self): 93 """Returns a dictionary of relays and there current state.""" 94 result = self._load_page(self.HIDDEN_STATUS_PAGE) 95 if 'TUX' not in result: 96 raise RelayDeviceConnectionError( 97 'Sainsmart board with URL %s has not completed initialization ' 98 'after its IP was set, and must be power-cycled to prevent ' 99 'random disconnections. After power-cycling, make sure %s/%s ' 100 'has TUX appear in its output.' % 101 (self.base_url, self.base_url, self.HIDDEN_STATUS_PAGE)) 102 status_string = re.search(r'">([01]*)TUX', result).group(1) 103 104 self.status_dict = {} 105 for index, char in enumerate(status_string): 106 self.status_dict[index] = ( 107 RelayState.NC if char == '1' else RelayState.NO) 108 109 def _print_status(self): 110 """Prints out the list of relays and their current state.""" 111 for i in range(0, 8): 112 print('Relay {}: {}'.format(i, self.status_dict[i])) 113 114 def get_relay_status(self, relay_position): 115 """Returns the current status of the passed in relay.""" 116 if self.status_dict is None: 117 self._sync_status_dict() 118 return self.status_dict[relay_position] 119 120 def set(self, relay_position, value): 121 """Sets the given relay to be either ON or OFF, indicated by value.""" 122 if self.status_dict is None: 123 self._sync_status_dict() 124 self._load_page(self._get_relay_url_code(relay_position, value)) 125 self.status_dict[relay_position] = value 126 127 @staticmethod 128 def _get_relay_url_code(relay_position, no_or_nc): 129 """Returns the two digit code corresponding to setting the relay.""" 130 if no_or_nc == RelayState.NC: 131 on_modifier = 1 132 else: 133 on_modifier = 0 134 return '{:02d}'.format(relay_position * 2 + on_modifier) 135