1import logging, re, time 2 3from autotest_lib.server import autotest, test, hosts 4from autotest_lib.server.cros import stress 5from autotest_lib.client.common_lib import error 6 7# Timeout duration to wait for a DHCP response. 8# It usually doesn't take this long, but just in case. 9TIMEOUT = 60 10 11class network_StressServoEthernetPlug(test.test): 12 13 ETH_MAC = 'mac' 14 ETH_IP = 'ipaddress' 15 16 version = 1 17 18 19 def initialize(self, host): 20 self.host = host 21 self.host_iface = None 22 self.servo_iface = None 23 self.servo_eth_up() 24 25 end_time = time.time() + TIMEOUT 26 while time.time() < end_time: 27 self.eth_interfaces = self.get_eth_interfaces() 28 if len(self.eth_interfaces) >= 2: 29 break 30 time.sleep(1) 31 32 # Assuming 2 ethernet interfaces, the interface not for host 33 # is that associated with servo. 34 for iface, eth_dict in self.eth_interfaces.iteritems(): 35 if eth_dict[self.ETH_IP] == self.host.hostname: 36 self.host_iface = iface 37 else: 38 self.servo_iface = iface 39 40 if not self.servo_iface: 41 raise error.TestError('Cannot find servo ethernet interface') 42 43 logging.info('Servo eth: %s', self.servo_iface) 44 logging.info('Host eth: %s', self.host_iface) 45 46 47 def servo_eth_up(self): 48 logging.info('Bringing up ethernet') 49 self.host.servo.set('dut_hub_on', 'yes') 50 51 52 def servo_eth_down(self): 53 logging.info('Bringing down ethernet') 54 self.host.servo.set('dut_hub_on', 'no') 55 56 57 def get_eth_interfaces(self): 58 """ Gets the ethernet device object. 59 60 Returns: 61 A dictionary of ethernet devices. 62 { 63 'eth<x>': 64 { 65 'mac': <mac address>, 66 'ipaddress': <ipaddress>, 67 } 68 } 69 """ 70 results = self.host.run('ifconfig').stdout.split('\n') 71 eth_dict = {} 72 73 iterator = results.__iter__() 74 for line in iterator: 75 # Search for the beginning of an interface section. 76 iface_start = re.search('^(eth\S+)\s+Link encap:Ethernet\s+HWaddr' 77 '\s+(\S+)', line) 78 if iface_start: 79 (iface, hwaddr) = iface_start.groups() 80 line = iterator.next() 81 result = re.search('^\s+inet addr:(\S+)\s+', line) 82 ipaddress = None 83 if result: 84 ipaddress = result.groups()[0] 85 eth_dict[iface] = {self.ETH_MAC: hwaddr, self.ETH_IP: ipaddress} 86 return eth_dict 87 88 89 def verify_eth_status(self, up_list, timeout=TIMEOUT): 90 """ Verify the up_list ifaces are up (and its contrapositive). """ 91 end_time = time.time() + timeout 92 interfaces = {} 93 while time.time() < end_time: 94 interfaces = self.get_eth_interfaces() 95 error_message = ('Expected eth status %s but instead got %s' % 96 (up_list, interfaces.keys())) 97 if set(interfaces.keys()) == set(up_list): 98 # Check to make sure all the interfaces are up. 99 for iface, eth_dict in interfaces.iteritems(): 100 if not eth_dict[self.ETH_IP]: 101 error_message = ('Ethernet interface %s did not ' 102 'receive address.' % iface) 103 break 104 else: 105 # All desired interfaces are up, and they have ip addresses. 106 break 107 time.sleep(1) 108 else: 109 # If the while loop terminates without interruption, we've timed out 110 # waiting for the interface. 111 raise error.TestFail(error_message) 112 113 114 def run_once(self, num_iterations=1): 115 for iteration in range(num_iterations): 116 logging.info('Executing iteration %d', iteration) 117 self.servo_eth_down() 118 self.verify_eth_status([self.host_iface]) 119 self.servo_eth_up() 120 self.verify_eth_status([self.host_iface, self.servo_iface]) 121