1#!/usr/bin/python 2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6import atexit 7import errno 8import logging 9import re 10import sys 11import socket 12import threading 13import xmlrpclib 14 15import rpm_controller 16import rpm_logging_config 17 18from config import rpm_config 19from MultiThreadedXMLRPCServer import MultiThreadedXMLRPCServer 20from rpm_infrastructure_exception import RPMInfrastructureException 21 22import common 23from autotest_lib.site_utils.rpm_control_system import utils 24 25LOG_FILENAME_FORMAT = rpm_config.get('GENERAL','dispatcher_logname_format') 26 27 28class RPMDispatcher(object): 29 """ 30 This class is the RPM dispatcher server and it is responsible for 31 communicating directly to the RPM devices to change a DUT's outlet status. 32 33 When an RPMDispatcher is initialized it registers itself with the frontend 34 server, who will field out outlet requests to this dispatcher. 35 36 Once a request is received the dispatcher looks up the RPMController 37 instance for the given DUT and then queues up the request and blocks until 38 it is processed. 39 40 @var _address: IP address or Hostname of this dispatcher server. 41 @var _frontend_server: URI of the frontend server. 42 @var _lock: Lock used to synchronize access to _worker_dict. 43 @var _port: Port assigned to this server instance. 44 @var _worker_dict: Dictionary mapping RPM hostname's to RPMController 45 instances. 46 """ 47 48 49 def __init__(self, address, port): 50 """ 51 RPMDispatcher constructor. 52 53 Initialized instance vars and registers this server with the frontend 54 server. 55 56 @param address: Address of this dispatcher server. 57 @param port: Port assigned to this dispatcher server. 58 59 @raise RPMInfrastructureException: Raised if the dispatch server is 60 unable to register with the frontend 61 server. 62 """ 63 self._address = address 64 self._port = port 65 self._lock = threading.Lock() 66 self._worker_dict = {} 67 # We assume that the frontend server and dispatchers are running on the 68 # same host, and the frontend server is listening for connections from 69 # the external world. 70 frontend_server_port = rpm_config.getint('RPM_INFRASTRUCTURE', 71 'frontend_port') 72 self._frontend_server = 'http://%s:%d' % (socket.gethostname(), 73 frontend_server_port) 74 logging.info('Registering this rpm dispatcher with the frontend ' 75 'server at %s.', self._frontend_server) 76 client = xmlrpclib.ServerProxy(self._frontend_server) 77 # De-register with the frontend when the dispatcher exit's. 78 atexit.register(self._unregister) 79 try: 80 client.register_dispatcher(self._get_serveruri()) 81 except socket.error as er: 82 err_msg = ('Unable to register with frontend server. Error: %s.' % 83 errno.errorcode[er.errno]) 84 logging.error(err_msg) 85 raise RPMInfrastructureException(err_msg) 86 87 88 def _worker_dict_put(self, key, value): 89 """ 90 Private method used to synchronize access to _worker_dict. 91 92 @param key: key value we are using to access _worker_dict. 93 @param value: value we are putting into _worker_dict. 94 """ 95 with self._lock: 96 self._worker_dict[key] = value 97 98 99 def _worker_dict_get(self, key): 100 """ 101 Private method used to synchronize access to _worker_dict. 102 103 @param key: key value we are using to access _worker_dict. 104 @return: value found when accessing _worker_dict 105 """ 106 with self._lock: 107 return self._worker_dict.get(key) 108 109 110 def is_up(self): 111 """ 112 Allows the frontend server to see if the dispatcher server is up before 113 attempting to queue requests. 114 115 @return: True. If connection fails, the client proxy will throw a socket 116 error on the client side. 117 """ 118 return True 119 120 121 def queue_request(self, powerunit_info_dict, new_state): 122 """ 123 Looks up the appropriate RPMController instance for the device and queues 124 up the request. 125 126 @param powerunit_info_dict: A dictionary, containing the attribute/values 127 of an unmarshalled PowerUnitInfo instance. 128 @param new_state: [ON, OFF, CYCLE] state we want to the change the 129 outlet to. 130 @return: True if the attempt to change power state was successful, 131 False otherwise. 132 """ 133 powerunit_info = utils.PowerUnitInfo(**powerunit_info_dict) 134 logging.info('Received request to set device: %s to state: %s', 135 powerunit_info.device_hostname, new_state) 136 rpm_controller = self._get_rpm_controller( 137 powerunit_info.powerunit_hostname, 138 powerunit_info.hydra_hostname) 139 return rpm_controller.queue_request(powerunit_info, new_state) 140 141 142 def _get_rpm_controller(self, rpm_hostname, hydra_hostname=None): 143 """ 144 Private method that retreives the appropriate RPMController instance 145 for this RPM Hostname or calls _create_rpm_controller it if it does not 146 already exist. 147 148 @param rpm_hostname: hostname of the RPM whose RPMController we want. 149 150 @return: RPMController instance responsible for this RPM. 151 """ 152 if not rpm_hostname: 153 return None 154 rpm_controller = self._worker_dict_get(rpm_hostname) 155 if not rpm_controller: 156 rpm_controller = self._create_rpm_controller( 157 rpm_hostname, hydra_hostname) 158 self._worker_dict_put(rpm_hostname, rpm_controller) 159 return rpm_controller 160 161 162 def _create_rpm_controller(self, rpm_hostname, hydra_hostname): 163 """ 164 Determines the type of RPMController required and initializes it. 165 166 @param rpm_hostname: Hostname of the RPM we need to communicate with. 167 168 @return: RPMController instance responsible for this RPM. 169 """ 170 hostname_elements = rpm_hostname.split('-') 171 if hostname_elements[-2] == 'poe': 172 # POE switch hostname looks like 'chromeos2-poe-switch1'. 173 logging.info('The controller is a Cisco POE switch.') 174 return rpm_controller.CiscoPOEController(rpm_hostname) 175 else: 176 # The device is an RPM. 177 rack_id = hostname_elements[-2] 178 rpm_typechecker = re.compile('rack[0-9]+[a-z]+') 179 if rpm_typechecker.match(rack_id): 180 logging.info('RPM is a webpowered device.') 181 return rpm_controller.WebPoweredRPMController(rpm_hostname) 182 else: 183 logging.info('RPM is a Sentry CDU device.') 184 return rpm_controller.SentryRPMController( 185 hostname=rpm_hostname, 186 hydra_hostname=hydra_hostname) 187 188 189 def _get_serveruri(self): 190 """ 191 Formats the _address and _port into a meaningful URI string. 192 193 @return: URI of this dispatch server. 194 """ 195 return 'http://%s:%d' % (self._address, self._port) 196 197 198 def _unregister(self): 199 """ 200 Tells the frontend server that this dispatch server is shutting down and 201 to unregister it. 202 203 Called by atexit. 204 205 @raise RPMInfrastructureException: Raised if the dispatch server is 206 unable to unregister with the 207 frontend server. 208 """ 209 logging.info('Dispatch server shutting down. Unregistering with RPM ' 210 'frontend server.') 211 client = xmlrpclib.ServerProxy(self._frontend_server) 212 try: 213 client.unregister_dispatcher(self._get_serveruri()) 214 except socket.error as er: 215 err_msg = ('Unable to unregister with frontend server. Error: %s.' % 216 errno.errorcode[er.errno]) 217 logging.error(err_msg) 218 raise RPMInfrastructureException(err_msg) 219 220 221def launch_server_on_unused_port(): 222 """ 223 Looks up an unused port on this host and launches the xmlrpc server. 224 225 Useful for testing by running multiple dispatch servers on the same host. 226 227 @return: server,port - server object and the port that which it is listening 228 to. 229 """ 230 address = socket.gethostbyname(socket.gethostname()) 231 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 232 # Set this socket to allow reuse. 233 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 234 sock.bind(('', 0)) 235 port = sock.getsockname()[1] 236 server = MultiThreadedXMLRPCServer((address, port), 237 allow_none=True) 238 sock.close() 239 return server, port 240 241 242if __name__ == '__main__': 243 """ 244 Main function used to launch the dispatch server. Creates an instance of 245 RPMDispatcher and registers it to a MultiThreadedXMLRPCServer instance. 246 """ 247 if len(sys.argv) != 2: 248 print 'Usage: ./%s <log_file_name>' % sys.argv[0] 249 sys.exit(1) 250 251 rpm_logging_config.start_log_server(sys.argv[1], LOG_FILENAME_FORMAT) 252 rpm_logging_config.set_up_logging_to_server() 253 254 # Get the local ip _address and set the server to utilize it. 255 address = socket.gethostbyname(socket.gethostname()) 256 server, port = launch_server_on_unused_port() 257 rpm_dispatcher = RPMDispatcher(address, port) 258 server.register_instance(rpm_dispatcher) 259 server.serve_forever() 260