1# Copyright 2015 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5""" 6Base class for DHCPv6 tests. This class just sets up a little bit of plumbing, 7like a virtual ethernet device with one end that looks like a real ethernet 8device to shill and a DHCPv6 test server on the end that doesn't look like a 9real ethernet interface to shill. Child classes should override test_body() 10with the logic of their test. 11""" 12 13import logging 14import time 15import traceback 16 17from autotest_lib.client.bin import test 18from autotest_lib.client.common_lib import error 19from autotest_lib.client.common_lib.cros import virtual_ethernet_pair 20from autotest_lib.client.cros import dhcpv6_test_server 21from autotest_lib.client.cros.networking import shill_proxy 22 23# These are keys that may be used with the DBus dictionary returned from 24# Dhcpv6TestBase.get_interface_ipconfig(). 25DHCPV6_KEY_ADDRESS = 'Address' 26DHCPV6_KEY_DELEGATED_PREFIX = 'DelegatedPrefix' 27DHCPV6_KEY_DELEGATED_PREFIX_LENGTH = 'DelegatedPrefixLength' 28DHCPV6_KEY_NAMESERVERS = 'NameServers' 29DHCPV6_KEY_SEARCH_DOMAIN_LIST = 'SearchDomains' 30 31# After DHCPv6 completes, an ipconfig should appear shortly after 32IPCONFIG_POLL_COUNT = 5 33IPCONFIG_POLL_PERIOD_SECONDS = 1 34 35class Dhcpv6TestBase(test.test): 36 """Parent class for tests that work verify DHCPv6 behavior.""" 37 version = 1 38 39 def get_device(self, interface_name): 40 """Finds the corresponding Device object for an interface with 41 the name |interface_name|. 42 43 @param interface_name string The name of the interface to check. 44 45 @return DBus interface object representing the associated device. 46 47 """ 48 return self.shill_proxy.find_object('Device', 49 {'Name': interface_name}) 50 51 52 def find_ethernet_service(self, interface_name): 53 """Finds the corresponding service object for an Ethernet interface. 54 55 @param interface_name string The name of the associated interface 56 57 @return Service object representing the associated service. 58 59 """ 60 device = self.get_device(interface_name) 61 device_path = shill_proxy.ShillProxy.dbus2primitive(device.object_path) 62 return self.shill_proxy.find_object('Service', {'Device': device_path}) 63 64 65 def get_interface_ipconfig_objects(self, interface_name): 66 """ 67 Returns a list of dbus object proxies for |interface_name|. 68 Returns an empty list if no such interface exists. 69 70 @param interface_name string name of the device to query (e.g., "eth0"). 71 72 @return list of objects representing DBus IPConfig RPC endpoints. 73 74 """ 75 device = self.get_device(interface_name) 76 if device is None: 77 return [] 78 79 device_properties = device.GetProperties(utf8_strings=True) 80 proxy = self.shill_proxy 81 82 ipconfig_object = proxy.DBUS_TYPE_IPCONFIG 83 return filter(bool, 84 [ proxy.get_dbus_object(ipconfig_object, property_path) 85 for property_path in device_properties['IPConfigs'] ]) 86 87 88 def get_interface_ipconfig(self, interface_name): 89 """ 90 Returns a dictionary containing settings for an |interface_name| set 91 via DHCPv6. Returns None if no such interface or setting bundle on 92 that interface can be found in shill. 93 94 @param interface_name string name of the device to query (e.g., "eth0"). 95 96 @return dict containing the the properties of the IPConfig stripped 97 of DBus meta-data or None. 98 99 """ 100 dhcp_properties = None 101 for ipconfig in self.get_interface_ipconfig_objects(interface_name): 102 logging.info('Looking at ipconfig %r', ipconfig) 103 ipconfig_properties = ipconfig.GetProperties(utf8_strings=True) 104 if 'Method' not in ipconfig_properties: 105 logging.info('Found ipconfig object with no method field') 106 continue 107 if ipconfig_properties['Method'] != 'dhcp6': 108 logging.info('Found ipconfig object with method != dhcp6') 109 continue 110 if dhcp_properties != None: 111 raise error.TestFail('Found multiple ipconfig objects ' 112 'with method == dhcp6') 113 dhcp_properties = ipconfig_properties 114 if dhcp_properties is None: 115 logging.info('Did not find IPConfig object with method == dhcp6') 116 return None 117 logging.info('Got raw dhcp config dbus object: %s.', dhcp_properties) 118 return shill_proxy.ShillProxy.dbus2primitive(dhcp_properties) 119 120 121 def run_once(self): 122 self._server = None 123 self._server_ip = None 124 self._ethernet_pair = None 125 self._shill_proxy = shill_proxy.ShillProxy() 126 try: 127 # TODO(zqiu): enable DHCPv6 for peer interface, either by restarting 128 # shill with appropriate command line options or via a new DBUS 129 # command. 130 self._ethernet_pair = virtual_ethernet_pair.VirtualEthernetPair( 131 interface_ip=None, 132 peer_interface_name='pseudoethernet0', 133 peer_interface_ip=None, 134 interface_ipv6=dhcpv6_test_server.DHCPV6_SERVER_ADDRESS) 135 self._ethernet_pair.setup() 136 if not self._ethernet_pair.is_healthy: 137 raise error.TestFail('Could not create virtual ethernet pair.') 138 self._server_ip = self._ethernet_pair.interface_ip 139 self._server = dhcpv6_test_server.Dhcpv6TestServer( 140 self._ethernet_pair.interface_name) 141 self._server.start() 142 self.test_body() 143 except (error.TestFail, error.TestNAError): 144 # Pass these through without modification. 145 raise 146 except Exception as e: 147 logging.error('Caught exception: %s.', str(e)) 148 logging.error('Trace: %s', traceback.format_exc()) 149 raise error.TestFail('Caught exception: %s.' % str(e)) 150 finally: 151 if self._server is not None: 152 self._server.stop() 153 if self._ethernet_pair is not None: 154 self._ethernet_pair.teardown() 155 156 def test_body(self): 157 """ 158 Override this method with the body of your test. You may safely assume 159 that the the properties exposed by DhcpTestBase correctly return 160 references to the test apparatus. 161 """ 162 raise error.TestFail('No test body implemented') 163 164 @property 165 def server_ip(self): 166 """ 167 Return the IP address of the side of the interface that the DHCPv6 test 168 server is bound to. The server itself is bound the the broadcast 169 address on the interface. 170 """ 171 return self._server_ip 172 173 @property 174 def server(self): 175 """ 176 Returns a reference to the DHCP test server. Use this to add handlers 177 and run tests. 178 """ 179 return self._server 180 181 @property 182 def ethernet_pair(self): 183 """ 184 Returns a reference to the virtual ethernet pair created to run DHCP 185 tests on. 186 """ 187 return self._ethernet_pair 188 189 @property 190 def shill_proxy(self): 191 """ 192 Returns a the shill proxy instance. 193 """ 194 return self._shill_proxy 195 196 197 def check_dhcpv6_config(self): 198 """ 199 Compare the DHCPv6 ipconfig with DHCP lease parameters to ensure 200 that the DUT attained the correct values. 201 202 """ 203 # Retrieve DHCPv6 configuration. 204 for attempt in range(IPCONFIG_POLL_COUNT): 205 dhcpv6_config = self.get_interface_ipconfig( 206 self.ethernet_pair.peer_interface_name) 207 # Wait until both IP address and delegated prefix are obtained. 208 if (dhcpv6_config is not None and 209 dhcpv6_config.get(DHCPV6_KEY_ADDRESS) and 210 dhcpv6_config.get(DHCPV6_KEY_DELEGATED_PREFIX)): 211 break; 212 time.sleep(IPCONFIG_POLL_PERIOD_SECONDS) 213 else: 214 raise error.TestFail('Failed to retrieve DHCPv6 ipconfig object ' 215 'from shill.') 216 217 # Verify Non-temporary Address prefix. 218 address = dhcpv6_config.get(DHCPV6_KEY_ADDRESS) 219 actual_prefix = address[:address.index('::')] 220 expected_prefix = dhcpv6_test_server.DHCPV6_SERVER_SUBNET_PREFIX[: 221 dhcpv6_test_server.DHCPV6_SERVER_SUBNET_PREFIX.index('::')] 222 if actual_prefix != expected_prefix: 223 raise error.TestFail('Address prefix mismatch: ' 224 'actual %s expected %s.' % 225 (actual_prefix, expected_prefix)) 226 # Verify Non-temporary Address suffix. 227 actual_suffix = int(address[address.index('::')+2:], 16) 228 if (actual_suffix < dhcpv6_test_server.DHCPV6_ADDRESS_RANGE_LOW or 229 actual_suffix > dhcpv6_test_server.DHCPV6_ADDRESS_RANGE_HIGH): 230 raise error.TestFail('Invalid address suffix: ' 231 'actual %x expected (%x-%x)' % 232 (actual_suffix, 233 dhcpv6_test_server.DHCPV6_ADDRESS_RANGE_LOW, 234 dhcpv6_test_server.DHCPV6_ADDRESS_RANGE_HIGH)) 235 236 # Verify delegated prefix. 237 delegated_prefix = dhcpv6_config.get(DHCPV6_KEY_DELEGATED_PREFIX) 238 for x in range( 239 dhcpv6_test_server.DHCPV6_PREFIX_DELEGATION_INDEX_LOW, 240 dhcpv6_test_server.DHCPV6_PREFIX_DELEGATION_INDEX_HIGH+1): 241 valid_prefix = \ 242 dhcpv6_test_server.DHCPV6_PREFIX_DELEGATION_RANGE_FORMAT % x 243 if delegated_prefix == valid_prefix: 244 break; 245 else: 246 raise error.TestFail('Invalid delegated prefix: %s' % 247 (delegated_prefix)) 248 # Verify delegated prefix length. 249 delegated_prefix_length = \ 250 int(dhcpv6_config.get(DHCPV6_KEY_DELEGATED_PREFIX_LENGTH)) 251 expected_delegated_prefix_length = \ 252 dhcpv6_test_server.DHCPV6_PREFIX_DELEGATION_PREFIX_LENGTH 253 if delegated_prefix_length != expected_delegated_prefix_length: 254 raise error.TestFail('Delegated prefix length mismatch: ' 255 'actual %d expected %d' % 256 (delegated_prefix_length, 257 expected_delegated_prefix_length)) 258 259 # Verify name servers. 260 actual_name_servers = dhcpv6_config.get(DHCPV6_KEY_NAMESERVERS) 261 expected_name_servers = \ 262 dhcpv6_test_server.DHCPV6_NAME_SERVERS.split(',') 263 if actual_name_servers != expected_name_servers: 264 raise error.TestFail('Name servers mismatch: actual %r expected %r' 265 % (actual_name_servers, expected_name_servers)) 266 # Verify domain search. 267 actual_domain_search = dhcpv6_config.get(DHCPV6_KEY_SEARCH_DOMAIN_LIST) 268 expected_domain_search = \ 269 dhcpv6_test_server.DHCPV6_DOMAIN_SEARCH.split(',') 270 if actual_domain_search != expected_domain_search: 271 raise error.TestFail('Domain search list mismatch: ' 272 'actual %r expected %r' % 273 (actual_domain_search, expected_domain_search)) 274