import logging, re, time from autotest_lib.server import autotest, test, hosts from autotest_lib.server.cros import stress from autotest_lib.client.common_lib import error # Timeout duration to wait for a DHCP response. # It usually doesn't take this long, but just in case. TIMEOUT = 60 class network_StressServoEthernetPlug(test.test): ETH_MAC = 'mac' ETH_IP = 'ipaddress' version = 1 def initialize(self, host): self.host = host self.host_iface = None self.servo_iface = None self.servo_eth_up() end_time = time.time() + TIMEOUT while time.time() < end_time: self.eth_interfaces = self.get_eth_interfaces() if len(self.eth_interfaces) >= 2: break time.sleep(1) # Assuming 2 ethernet interfaces, the interface not for host # is that associated with servo. for iface, eth_dict in self.eth_interfaces.iteritems(): if eth_dict[self.ETH_IP] == self.host.hostname: self.host_iface = iface else: self.servo_iface = iface if not self.servo_iface: raise error.TestError('Cannot find servo ethernet interface') logging.info('Servo eth: %s', self.servo_iface) logging.info('Host eth: %s', self.host_iface) def servo_eth_up(self): logging.info('Bringing up ethernet') self.host.servo.set('dut_hub_on', 'yes') def servo_eth_down(self): logging.info('Bringing down ethernet') self.host.servo.set('dut_hub_on', 'no') def get_eth_interfaces(self): """ Gets the ethernet device object. Returns: A dictionary of ethernet devices. { 'eth': { 'mac': , 'ipaddress': , } } """ results = self.host.run('ifconfig').stdout.split('\n') eth_dict = {} iterator = results.__iter__() for line in iterator: # Search for the beginning of an interface section. iface_start = re.search('^(eth\S+)\s+Link encap:Ethernet\s+HWaddr' '\s+(\S+)', line) if iface_start: (iface, hwaddr) = iface_start.groups() line = iterator.next() result = re.search('^\s+inet addr:(\S+)\s+', line) ipaddress = None if result: ipaddress = result.groups()[0] eth_dict[iface] = {self.ETH_MAC: hwaddr, self.ETH_IP: ipaddress} return eth_dict def verify_eth_status(self, up_list, timeout=TIMEOUT): """ Verify the up_list ifaces are up (and its contrapositive). """ end_time = time.time() + timeout interfaces = {} while time.time() < end_time: interfaces = self.get_eth_interfaces() error_message = ('Expected eth status %s but instead got %s' % (up_list, interfaces.keys())) if set(interfaces.keys()) == set(up_list): # Check to make sure all the interfaces are up. for iface, eth_dict in interfaces.iteritems(): if not eth_dict[self.ETH_IP]: error_message = ('Ethernet interface %s did not ' 'receive address.' % iface) break else: # All desired interfaces are up, and they have ip addresses. break time.sleep(1) else: # If the while loop terminates without interruption, we've timed out # waiting for the interface. raise error.TestFail(error_message) def run_once(self, num_iterations=1): for iteration in range(num_iterations): logging.info('Executing iteration %d', iteration) self.servo_eth_down() self.verify_eth_status([self.host_iface]) self.servo_eth_up() self.verify_eth_status([self.host_iface, self.servo_iface])