1# Copyright 2019 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Wrapper test to run verification on a labstation.""" 6 7import json 8import logging 9import os 10import re 11import time 12 13from autotest_lib.client.common_lib import error 14from autotest_lib.server import test 15from autotest_lib.server import utils as server_utils 16from autotest_lib.server import site_utils 17from autotest_lib.server.hosts import servo_host as _servo_host 18from autotest_lib.server.hosts import servo_constants 19from autotest_lib.server.hosts import factory 20from autotest_lib.server.hosts import host_info 21 22 23class servo_LabstationVerification(test.test): 24 """Wrapper test to run verifications on a labstation image. 25 26 This test verifies basic servod behavior on the host supplied to it e.g. 27 that servod can start etc, before inferring the DUT attached to the servo 28 device, and running more comprehensive servod tests by using a full 29 cros_host and servo_host setup. 30 """ 31 version = 1 32 33 UL_BIT_MASK = 0x2 34 35 # Regex to match ipv4 byte. 36 IPV4_RE_BLOCK = r'(25[0-5]|2[0-4][0-9]|[01]?[0-9]?[0-9])' 37 38 # Full regex to match an ipv4 with optional subnet mask. 39 RE_IPV4 = re.compile(r'^(%(block)s\.){3}(%(block)s)(/\d+)?$' % 40 {'block':IPV4_RE_BLOCK}) 41 42 # Timeout in seconds to wait after cold_reset before attempting to ping 43 # again. This includes a potential fw screen (30s), and some buffer 44 # for the network. 45 RESET_TIMEOUT_S = 60 46 47 def get_servo_mac(self, servo_proxy): 48 """Given a servo's serial retrieve ethernet port mac address. 49 50 @param servo_proxy: proxy to talk to servod 51 52 @returns: mac address of the ethernet port as a string 53 @raises: error.TestError: if mac address cannot be inferred 54 """ 55 # TODO(coconutruben): once mac address retrieval through v4 is 56 # implemented remove these lines of code, and replace with 57 # servo_v4_eth_mac. 58 try: 59 serial = servo_proxy.get('support.serialname') 60 if serial == 'unknown': 61 serial = servo_proxy.get('serialname') 62 except error.TestFail as e: 63 if 'No control named' in e: 64 serial = servo_proxy.get('serialname') 65 else: 66 raise e 67 ctrl_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 68 'serial_to_mac_map.json') 69 with open(ctrl_path, 'r') as f: 70 serial_mac_map = json.load(f) 71 if not serial in serial_mac_map: 72 raise error.TestError('Unable to retrieve mac address for ' 73 'serial %s' % serial) 74 return str(serial_mac_map[serial]) 75 76 def _flip_UL_bit(self, byte): 77 """Helper to flip the Universal/Local bit in a given byte. 78 79 For some IPv6's extended unique identifier (EUI) 64 calculation 80 part of the logic is to flip the U/L bit on the first byte. 81 82 Note: it is the callers responsibility to ensure that |byte| is 83 only one byte. This function will just flip the 7th bit of whatever 84 is supplied and return that. 85 86 @param byte: the byte to flip 87 88 @returns: |byte| with it's U/L bit flipped. 89 """ 90 return byte ^ self.UL_BIT_MASK 91 92 def _from_mac_to_ipv6_eui_64(self, mac): 93 """Convert a MAC address (IEEE EUI48) to a IEEE EUI64 node component. 94 95 This follows guidelines to convert a mac address to an IPv6 node 96 component by 97 - splitting the mac into two parts 98 - inserting 0xfffe in between the two parts 99 - flipping the U/L bit on the first byte 100 101 @param mac: string containing the mac address 102 103 @returns: string containing the IEEE EUI64 node component to |mac| 104 """ 105 mac_bytes = [b.lower() for b in mac.split(':')] 106 # First, flip the 7th bit again. This converts the string coming from 107 # the mac (as it's a hex) into an int, flips it, before casting it back 108 # to a hex as is expected for the mac address. 109 mac_bytes[0] = hex(self._flip_UL_bit(int(mac_bytes[0],16)))[2:] 110 mac_bytes = (mac_bytes[:3] + ['ff', 'fe'] + mac_bytes[-3:]) 111 ipv6_components = [] 112 while mac_bytes: 113 # IPv6 has two bytes between : 114 ipv6_components.append('%s%s' % (mac_bytes.pop(0), 115 mac_bytes.pop(0))) 116 # Lastly, remove the leading 0s to have a well formatted concise IPv6. 117 return ':'.join([c.lstrip('0') for c in ipv6_components]) 118 119 def _mac_to_ipv6_addr(self, mac, ipv6_network_component): 120 """Helper to generate an IPv6 address given network component and mac. 121 122 @param mac: the mac address of the target network interface 123 @param ipv6_network_component: prefix + subnet id portion of IPv6 [:64] 124 125 @returns: an IPv6 address that could be used to target the network 126 interface at |mac| if it's on the same network as the network 127 component indicates 128 """ 129 # Do not add an extra/miss a ':' when glueing both parts together. 130 glue = '' if ipv6_network_component[-1] == ':' else ':' 131 return '%s%s%s' % (ipv6_network_component, glue, 132 self._from_mac_to_ipv6_eui_64(mac)) 133 134 def _from_ipv6_to_mac_address(self, ipv6): 135 """Given an IPv6 address retrieve the mac address. 136 137 Assuming the address at |ipv6| followed the conversion standard layed 138 out at _from_mac_to_ipv6_eui_64() above, this helper does the inverse. 139 140 @param ipv6: full IPv6 address to extract the mac address from 141 142 @returns: mac address extracted from node component as a string 143 """ 144 # The node component i.e. the one holding the mac info is the 64 bits. 145 components = ipv6.split(':')[-4:] 146 # This is reversing the EUI 64 logic. 147 mac_bytes = [] 148 for component in components: 149 # Expand the components fully again. 150 full_component = component.rjust(4,'0') 151 # Mac addresses use one byte components as opposed to the two byte 152 # ones for IPv6 - split them up. 153 mac_bytes.extend([full_component[:2], full_component[2:]]) 154 # First, flip the 7th bit again. 155 mac_bytes[0] = self._flip_UL_bit(mac_bytes[0]) 156 # Second, remove the 0xFFFE bytes inserted in the middle again. 157 mac_bytes = mac_bytes[:3] + mac_bytes[-3:] 158 return ':'.join([c.lower() for c in mac_bytes]) 159 160 def _build_ssh_cmd(self, hostname, cmd): 161 """Build the ssh command to run |cmd| via bash on |hostname|. 162 163 @param hostname: hostname/ip where to run the cmd on 164 @param cmd: cmd on hostname to run 165 166 @returns: ssh command to run 167 """ 168 ssh_cmd = [r'ssh', '-q', '-o', 'StrictHostKeyChecking=no', 169 r'-o', 'UserKnownHostsFile=/dev/null', 170 r'root@%s' % hostname, 171 r'"%s"' % cmd] 172 return ' '.join(ssh_cmd) 173 174 def _ip_info_from_host(self, host, ip, info, host_name): 175 """Retrieve some |info| related to |ip| from host on |ip|. 176 177 @param host: object that implements 'run', where the command 178 will be executed form 179 @param ip: ip address to run on and to filter for 180 @param info: one of 'ipv4' or 'dev' 181 @param host_name: executing host's name, for error message 182 183 @returns: ipv4 associated on the same nic as |ip| if |info|== 'ipv4' 184 nic dev name associated with |ip| if |info|== 'dev' 185 186 @raises error.TestError: if output of 'ip --brief addr' is unexpected 187 @raises error.TestError: info not in ['ipv4', 'dev'] 188 """ 189 if info not in ['ipv4', 'dev']: 190 raise error.TestFail('Cannot retrieve info %r', info) 191 ip_stub = r"ip --brief addr | grep %s" % ip 192 cmd = self._build_ssh_cmd(ip, ip_stub) 193 logging.info('command to find %s on %s: %s', info, host_name, cmd) 194 # The expected output here is of the form: 195 # [net device] [UP/DOWN] [ipv4]/[subnet mask] [ipv6]/[subnet mask]+ 196 try: 197 output = host.run(cmd).stdout.strip() 198 except (error.AutoservRunError, error.CmdError) as e: 199 logging.error(str(e)) 200 raise error.TestFail('Failed to retrieve %s on %s' % (info, ip)) 201 logging.debug('ip raw output: %s', output) 202 components = output.split() 203 if info == 'ipv4': 204 # To be safe, get all IPs, and subsequently report the first ipv4 205 # found. 206 raw_ips = components[2:] 207 for raw_ip in raw_ips: 208 if re.match(self.RE_IPV4, raw_ip): 209 ret = raw_ip.split('/')[0] 210 logging.info('ipv4 found: %s', ret) 211 break 212 else: 213 raise error.TestFail('No ipv4 address found in ip command: %s' % 214 ', '.join(raw_ips)) 215 if info == 'dev': 216 ret = components[0] 217 logging.info('dev found: %s', ret) 218 return ret 219 220 def get_dut_on_servo_ip(self, servo_host_proxy): 221 """Retrieve the IPv4 IP of the DUT attached to a servo. 222 223 Note: this will reboot the DUT if it fails initially to get the IP 224 Note: for this to work, servo host and dut have to be on the same subnet 225 226 @param servo_host_proxy: proxy to talk to the servo host 227 228 @returns: IPv4 address of DUT attached to servo on |servo_host_proxy| 229 230 @raises error.TestError: if the ip cannot be inferred 231 """ 232 # Note: throughout this method, sh refers to servo host, dh to DUT host. 233 # Figure out servo hosts IPv6 address that's based on its mac address. 234 servo_proxy = servo_host_proxy._servo 235 sh_ip = server_utils.get_ip_address(servo_host_proxy.hostname) 236 sh_nic_dev = self._ip_info_from_host(servo_host_proxy, sh_ip, 'dev', 237 'servo host') 238 addr_cmd ='cat /sys/class/net/%s/address' % sh_nic_dev 239 sh_dev_addr = servo_host_proxy.run(addr_cmd).stdout.strip() 240 logging.debug('Inferred Labstation MAC to be: %s', sh_dev_addr) 241 sh_dev_ipv6_stub = self._from_mac_to_ipv6_eui_64(sh_dev_addr) 242 # This will get us the IPv6 address that uses the mac address as node id 243 cmd = (r'ifconfig %s | grep -oE "([0-9a-f]{0,4}:){4}%s"' % 244 (sh_nic_dev, sh_dev_ipv6_stub)) 245 servo_host_ipv6 = servo_host_proxy.run(cmd).stdout.strip() 246 logging.debug('Inferred Labstation IPv6 to be: %s', servo_host_ipv6) 247 # Figure out DUTs expected IPv6 address 248 # The network component should be shared between the DUT and the servo 249 # host as long as they're on the same subnet. 250 network_component = ':'.join(servo_host_ipv6.split(':')[:4]) 251 dut_ipv6 = self._mac_to_ipv6_addr(self.get_servo_mac(servo_proxy), 252 network_component) 253 logging.info('Inferred DUT IPv6 to be: %s', dut_ipv6) 254 # Dynamically generate the correct shell-script to retrieve the ipv4. 255 try: 256 server_utils.run('ping -6 -c 1 -w 35 %s' % dut_ipv6) 257 except error.CmdError: 258 # If the DUT cannot be pinged, then try to reset it and try to 259 # ping again. 260 logging.info('Failed to ping DUT on ipv6: %s. Cold resetting', 261 dut_ipv6) 262 servo_proxy._power_state.reset() 263 time.sleep(self.RESET_TIMEOUT_S) 264 dut_ipv4 = None 265 try: 266 # Pass |server_utils| here as it implements the same interface 267 # as a host to run things locally i.e. on the autoserv runner. 268 dut_ipv4 = self._ip_info_from_host(server_utils, dut_ipv6, 'ipv4', 269 'autoserv') 270 return dut_ipv4 271 except error.TestFail: 272 logging.info('Failed to retrieve the DUT ipv4 directly. ' 273 'Going to attempt to tunnel request through ' 274 'labstation and forgive the error for now.') 275 # Lastly, attempt to run the command from the labstation instead 276 # to guard against networking issues. 277 dut_ipv4 = self._ip_info_from_host(servo_host_proxy, dut_ipv6, 'ipv4', 278 'autoserv') 279 return dut_ipv4 280 281 def _set_dut_stable_version(self, dut_host, stable_version=None): 282 """Helper method to set stable_version in DUT host. 283 284 @param dut_host: CrosHost object representing the DUT. 285 """ 286 if not stable_version: 287 stable_version = self.cros_version 288 logging.info('Setting stable_version to %s for DUT %s.', 289 stable_version, dut_host.hostname) 290 info = dut_host.host_info_store.get() 291 info.stable_versions['cros'] = stable_version 292 dut_host.host_info_store.commit(info) 293 294 def _get_dut_info_from_config(self): 295 """Get DUT info from json config file. 296 297 @returns a list of dicts that each dict represents a dut. 298 """ 299 ctrl_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 300 'labstation_to_dut_map.json') 301 with open(ctrl_path, 'r') as f: 302 data = json.load(f, object_hook=self._byteify) 303 # create a default dut dict in case the servohost is not in config 304 # map, this is normally happened in local testing. 305 default_dut = { 306 'hostname': None, 307 'servo_port': '9999', 308 'servo_serial': None 309 } 310 return data.get(self.labstation_host.hostname, [default_dut]) 311 312 def _byteify(self, data, ignore_dicts=False): 313 """Helper method to convert unicode to string. 314 """ 315 if isinstance(data, unicode): 316 return data.encode('utf-8') 317 if isinstance(data, list): 318 return [self._byteify(item, ignore_dicts=True) for item in data] 319 if isinstance(data, dict) and not ignore_dicts: 320 return { 321 self._byteify(key, ignore_dicts=True): 322 self._byteify(value, ignore_dicts=True) 323 for key, value in data.iteritems() 324 } 325 return data 326 327 def _setup_servod(self): 328 """Setup all servod instances under servohost for later testing. 329 """ 330 for dut in self.dut_list: 331 # Use board: nami as default for local testing. 332 board = dut.get('board', 'nami') 333 port = dut.get('servo_port') 334 serial = dut.get('servo_serial') 335 servo_args = { 336 servo_constants.SERVO_HOST_ATTR: 337 self.labstation_host.hostname, 338 servo_constants.SERVO_PORT_ATTR: port, 339 servo_constants.SERVO_SERIAL_ATTR: serial, 340 servo_constants.SERVO_BOARD_ATTR: board, 341 servo_constants.ADDITIONAL_SERVOD_ARGS: 'DUAL_V4=1', 342 'is_in_lab': False, 343 } 344 345 logging.info('Setting up servod for port %s', port) 346 # We need try_lab_servo option here, so servo firmware will get 347 # updated before run tests. 348 servo_host, _ = _servo_host.create_servo_host(None, 349 servo_args, 350 try_lab_servo=True) 351 try: 352 validate_cmd = 'servodutil show -p %s' % port 353 servo_host.run_grep(validate_cmd, 354 stdout_err_regexp='No servod scratch entry found.') 355 except error.AutoservRunError: 356 raise error.TestFail('Servod of port %s did not come up on' 357 ' labstation.' % port) 358 359 self.servo_hosts.append(servo_host) 360 361 def setup_hosts(self): 362 """Prepare all cros and servo hosts that need to run.""" 363 # Servod came up successfully at this point - build a ServoHost and 364 # CrosHost for later testing to verfiy servo functionality. 365 366 for dut_info, servo_host in zip(self.dut_list, self.servo_hosts): 367 dut_hostname = dut_info.get('hostname') 368 if not dut_hostname: 369 # TODO(coconutruben@): remove this statement once the inferring 370 # is the default. 371 logging.info('hostname not specified for DUT, through ' 372 'static config or command-line. Will attempt ' 373 'to infer through hardware address.') 374 dut_hostname = self.get_dut_on_servo_ip(servo_host) 375 labels = [] 376 if dut_info.get('board'): 377 labels.append('board:%s' % dut_info.get('board')) 378 if dut_info.get('model'): 379 labels.append('model:%s' % dut_info.get('model')) 380 info = host_info.HostInfo(labels=labels) 381 host_info_store = host_info.InMemoryHostInfoStore(info=info) 382 machine = { 383 'hostname': dut_hostname, 384 'host_info_store': host_info_store, 385 'afe_host': site_utils.EmptyAFEHost() 386 } 387 dut_host = factory.create_host(machine) 388 dut_host.set_servo_host(servo_host) 389 390 # Copy labstation's stable_version to dut_host for later test 391 # consume. 392 # TODO(xianuowang@): remove this logic once we figured out how to 393 # propagate DUT's stable_version to the test. 394 stable_version_from_config = dut_info.get('stable_version') 395 self._set_dut_stable_version(dut_host, stable_version_from_config) 396 # Store |dut_host| in |machine_dict| so that parallel running can 397 # find the host. 398 self.machine_dict[dut_host.hostname] = dut_host 399 400 def initialize(self, host, config=None, local=False): 401 """Setup servod on |host| to run subsequent tests. 402 403 @param host: LabstationHost object representing the servohost. 404 @param config: the args argument from test_that in a dict. 405 @param local: whether a test image is already on the usb stick. 406 """ 407 # Cache whether this is a local run or not. 408 self.local = local 409 # This list hosts the servo_hosts, in the same order as the |dut_list| 410 # below. 411 self.servo_hosts = [] 412 # This dict houses a mapping of |dut| hostnames to initialized cros_host 413 # objects for the tests to run. 414 self.machine_dict = {} 415 # Save the host. 416 self.labstation_host = host 417 # Make sure recovery is quick in case of failure. 418 self.job.fast = True 419 # Get list of duts under the servohost. 420 self.dut_list = self._get_dut_info_from_config() 421 # Setup servod for all duts. 422 self._setup_servod() 423 # We need a cros build number for testing download image to usb and 424 # use servo to reimage DUT purpose. So copying labstation's 425 # stable_version here since we don't really care about which build 426 # to install on the DUT. 427 self.cros_version = ( 428 self.labstation_host.host_info_store.get().cros_stable_version) 429 430 if config: 431 if 'dut_ip' in config: 432 # Retrieve DUT ip from args if caller specified it. 433 # |dut_ip| is special in that it can be used for (quick) setup 434 # testing if the setup is not in the configuration file. 435 # This has two implications: 436 # - the user can only test one dut/servo pair 437 # - the config has to be empty. 438 # TODO(coconutruben): remove this logic for a more holistic 439 # command-line overwrite solution. 440 if len(self.dut_list) == 1 and not self.dut_list[0]['hostname']: 441 self.dut_list[0]['hostname'] = config['dut_ip'] 442 logging.info('Setting the hostname of the only dut to %s.', 443 self.dut_list[0]['hostname']) 444 else: 445 logging.info('dut_ip %s will be ignored. The target ' 446 'labstation is to be part of static config.') 447 if 'cros_version' in config: 448 # We allow user to override a cros image build. 449 self.cros_version = config['cros_version'] 450 # Lastly, setup the hosts so that testing can occur in parallel. 451 self.setup_hosts() 452 453 def _run_on_machine(self, machine): 454 """Thin wrapper to run 'servo_Verification' on all machines. 455 456 @param machine: hostname of the dut to run 'servo_Verification' against. 457 458 @raises error.TestFail: 'servo_Verification' fails 459 @raises error.TestFail: |machine| unknown (not in |self.machine_dict|) 460 """ 461 dut_host = self.machine_dict.get(machine, None) 462 if dut_host is None: 463 raise error.TestFail('dut machine %r not known to suite. Known ' 464 'machines: %r', machine, 465 ', '.join(self.machine_dict.keys())) 466 logging.info('About to run on machine %s', machine) 467 if not self.job.run_test('servo_Verification', host=dut_host, 468 local=self.local): 469 raise error.TestFail('At least one test failed.') 470 471 def run_once(self): 472 """Run through all hosts in |self.machine_dict|.""" 473 self.job.parallel_simple(self._run_on_machine, 474 list(self.machine_dict.keys())) 475 # TODO(coconutruben): at this point, you can print a report what kind of 476 # servod setups failed and which succeeded. Build that out so that 477 # debugging failures is cleaner given multiple setups. 478 479 def cleanup(self): 480 """Clean up by calling close for dut host, which will also take care 481 of servo cleanup. 482 """ 483 for _, dut in self.machine_dict.items(): 484 dut.close() 485