1# Lint as: python2, python3 2# Copyright 2015 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6""" 7Base class for DHCPv6 tests. This class just sets up a little bit of plumbing, 8like a virtual ethernet device with one end that looks like a real ethernet 9device to shill and a DHCPv6 test server on the end that doesn't look like a 10real ethernet interface to shill. Child classes should override test_body() 11with the logic of their test. 12""" 13 14from __future__ import absolute_import 15from __future__ import division 16from __future__ import print_function 17 18import logging 19from six.moves import filter 20from six.moves import range 21import time 22import traceback 23 24from autotest_lib.client.bin import test 25from autotest_lib.client.common_lib import error 26from autotest_lib.client.common_lib.cros import virtual_ethernet_pair 27from autotest_lib.client.cros import dhcpv6_test_server 28from autotest_lib.client.cros.networking import shill_proxy 29 30# These are keys that may be used with the DBus dictionary returned from 31# Dhcpv6TestBase.get_interface_ipconfig(). 32DHCPV6_KEY_ADDRESS = 'Address' 33DHCPV6_KEY_DELEGATED_PREFIX = 'DelegatedPrefix' 34DHCPV6_KEY_DELEGATED_PREFIX_LENGTH = 'DelegatedPrefixLength' 35DHCPV6_KEY_NAMESERVERS = 'NameServers' 36DHCPV6_KEY_SEARCH_DOMAIN_LIST = 'SearchDomains' 37 38# After DHCPv6 completes, an ipconfig should appear shortly after 39IPCONFIG_POLL_COUNT = 5 40IPCONFIG_POLL_PERIOD_SECONDS = 1 41 42class Dhcpv6TestBase(test.test): 43 """Parent class for tests that work verify DHCPv6 behavior.""" 44 version = 1 45 46 def get_device(self, interface_name): 47 """Finds the corresponding Device object for an interface with 48 the name |interface_name|. 49 50 @param interface_name string The name of the interface to check. 51 52 @return DBus interface object representing the associated device. 53 54 """ 55 return self.shill_proxy.find_object('Device', 56 {'Name': interface_name}) 57 58 59 def find_ethernet_service(self, interface_name): 60 """Finds the corresponding service object for an Ethernet interface. 61 62 @param interface_name string The name of the associated interface 63 64 @return Service object representing the associated service. 65 66 """ 67 device = self.get_device(interface_name) 68 device_path = shill_proxy.ShillProxy.dbus2primitive(device.object_path) 69 return self.shill_proxy.find_object('Service', {'Device': device_path}) 70 71 72 def get_interface_ipconfig_objects(self, interface_name): 73 """ 74 Returns a list of dbus object proxies for |interface_name|. 75 Returns an empty list if no such interface exists. 76 77 @param interface_name string name of the device to query (e.g., "eth0"). 78 79 @return list of objects representing DBus IPConfig RPC endpoints. 80 81 """ 82 device = self.get_device(interface_name) 83 if device is None: 84 return [] 85 86 device_properties = device.GetProperties(utf8_strings=True) 87 proxy = self.shill_proxy 88 89 ipconfig_object = proxy.DBUS_TYPE_IPCONFIG 90 return list(filter(bool, 91 [ proxy.get_dbus_object(ipconfig_object, property_path) 92 for property_path in device_properties['IPConfigs'] ])) 93 94 95 def get_interface_ipconfig(self, interface_name): 96 """ 97 Returns a dictionary containing settings for an |interface_name| set 98 via DHCPv6. Returns None if no such interface or setting bundle on 99 that interface can be found in shill. 100 101 @param interface_name string name of the device to query (e.g., "eth0"). 102 103 @return dict containing the the properties of the IPConfig stripped 104 of DBus meta-data or None. 105 106 """ 107 dhcp_properties = None 108 for ipconfig in self.get_interface_ipconfig_objects(interface_name): 109 logging.info('Looking at ipconfig %r', ipconfig) 110 ipconfig_properties = ipconfig.GetProperties(utf8_strings=True) 111 if 'Method' not in ipconfig_properties: 112 logging.info('Found ipconfig object with no method field') 113 continue 114 if ipconfig_properties['Method'] != 'dhcp6': 115 logging.info('Found ipconfig object with method != dhcp6') 116 continue 117 if dhcp_properties != None: 118 raise error.TestFail('Found multiple ipconfig objects ' 119 'with method == dhcp6') 120 dhcp_properties = ipconfig_properties 121 if dhcp_properties is None: 122 logging.info('Did not find IPConfig object with method == dhcp6') 123 return None 124 logging.info('Got raw dhcp config dbus object: %s.', dhcp_properties) 125 return shill_proxy.ShillProxy.dbus2primitive(dhcp_properties) 126 127 128 def run_once(self): 129 self._server = None 130 self._server_ip = None 131 self._ethernet_pair = None 132 self._shill_proxy = shill_proxy.ShillProxy() 133 try: 134 # TODO(zqiu): enable DHCPv6 for peer interface, either by restarting 135 # shill with appropriate command line options or via a new DBUS 136 # command. 137 self._ethernet_pair = virtual_ethernet_pair.VirtualEthernetPair( 138 interface_ip=None, 139 peer_interface_name='pseudoethernet0', 140 peer_interface_ip=None, 141 interface_ipv6=dhcpv6_test_server.DHCPV6_SERVER_ADDRESS) 142 self._ethernet_pair.setup() 143 if not self._ethernet_pair.is_healthy: 144 raise error.TestFail('Could not create virtual ethernet pair.') 145 self._server_ip = self._ethernet_pair.interface_ip 146 self._server = dhcpv6_test_server.Dhcpv6TestServer( 147 self._ethernet_pair.interface_name) 148 self._server.start() 149 self.test_body() 150 except (error.TestFail, error.TestNAError): 151 # Pass these through without modification. 152 raise 153 except Exception as e: 154 logging.error('Caught exception: %s.', str(e)) 155 logging.error('Trace: %s', traceback.format_exc()) 156 raise error.TestFail('Caught exception: %s.' % str(e)) 157 finally: 158 if self._server is not None: 159 self._server.stop() 160 if self._ethernet_pair is not None: 161 self._ethernet_pair.teardown() 162 163 def test_body(self): 164 """ 165 Override this method with the body of your test. You may safely assume 166 that the the properties exposed by DhcpTestBase correctly return 167 references to the test apparatus. 168 """ 169 raise error.TestFail('No test body implemented') 170 171 @property 172 def server_ip(self): 173 """ 174 Return the IP address of the side of the interface that the DHCPv6 test 175 server is bound to. The server itself is bound the the broadcast 176 address on the interface. 177 """ 178 return self._server_ip 179 180 @property 181 def server(self): 182 """ 183 Returns a reference to the DHCP test server. Use this to add handlers 184 and run tests. 185 """ 186 return self._server 187 188 @property 189 def ethernet_pair(self): 190 """ 191 Returns a reference to the virtual ethernet pair created to run DHCP 192 tests on. 193 """ 194 return self._ethernet_pair 195 196 @property 197 def shill_proxy(self): 198 """ 199 Returns a the shill proxy instance. 200 """ 201 return self._shill_proxy 202 203 204 def check_dhcpv6_config(self): 205 """ 206 Compare the DHCPv6 ipconfig with DHCP lease parameters to ensure 207 that the DUT attained the correct values. 208 209 """ 210 # Retrieve DHCPv6 configuration. 211 for attempt in range(IPCONFIG_POLL_COUNT): 212 dhcpv6_config = self.get_interface_ipconfig( 213 self.ethernet_pair.peer_interface_name) 214 # Wait until both IP address and delegated prefix are obtained. 215 if (dhcpv6_config is not None and 216 dhcpv6_config.get(DHCPV6_KEY_ADDRESS) and 217 dhcpv6_config.get(DHCPV6_KEY_DELEGATED_PREFIX)): 218 break; 219 time.sleep(IPCONFIG_POLL_PERIOD_SECONDS) 220 else: 221 raise error.TestFail('Failed to retrieve DHCPv6 ipconfig object ' 222 'from shill.') 223 224 # Verify Non-temporary Address prefix. 225 address = dhcpv6_config.get(DHCPV6_KEY_ADDRESS) 226 actual_prefix = address[:address.index('::')] 227 expected_prefix = dhcpv6_test_server.DHCPV6_SERVER_SUBNET_PREFIX[: 228 dhcpv6_test_server.DHCPV6_SERVER_SUBNET_PREFIX.index('::')] 229 if actual_prefix != expected_prefix: 230 raise error.TestFail('Address prefix mismatch: ' 231 'actual %s expected %s.' % 232 (actual_prefix, expected_prefix)) 233 # Verify Non-temporary Address suffix. 234 actual_suffix = int(address[address.index('::')+2:], 16) 235 if (actual_suffix < dhcpv6_test_server.DHCPV6_ADDRESS_RANGE_LOW or 236 actual_suffix > dhcpv6_test_server.DHCPV6_ADDRESS_RANGE_HIGH): 237 raise error.TestFail('Invalid address suffix: ' 238 'actual %x expected (%x-%x)' % 239 (actual_suffix, 240 dhcpv6_test_server.DHCPV6_ADDRESS_RANGE_LOW, 241 dhcpv6_test_server.DHCPV6_ADDRESS_RANGE_HIGH)) 242 243 # Verify delegated prefix. 244 delegated_prefix = dhcpv6_config.get(DHCPV6_KEY_DELEGATED_PREFIX) 245 for x in range( 246 dhcpv6_test_server.DHCPV6_PREFIX_DELEGATION_INDEX_LOW, 247 dhcpv6_test_server.DHCPV6_PREFIX_DELEGATION_INDEX_HIGH+1): 248 valid_prefix = \ 249 dhcpv6_test_server.DHCPV6_PREFIX_DELEGATION_RANGE_FORMAT % x 250 if delegated_prefix == valid_prefix: 251 break; 252 else: 253 raise error.TestFail('Invalid delegated prefix: %s' % 254 (delegated_prefix)) 255 # Verify delegated prefix length. 256 delegated_prefix_length = \ 257 int(dhcpv6_config.get(DHCPV6_KEY_DELEGATED_PREFIX_LENGTH)) 258 expected_delegated_prefix_length = \ 259 dhcpv6_test_server.DHCPV6_PREFIX_DELEGATION_PREFIX_LENGTH 260 if delegated_prefix_length != expected_delegated_prefix_length: 261 raise error.TestFail('Delegated prefix length mismatch: ' 262 'actual %d expected %d' % 263 (delegated_prefix_length, 264 expected_delegated_prefix_length)) 265 266 # Verify name servers. 267 actual_name_servers = dhcpv6_config.get(DHCPV6_KEY_NAMESERVERS) 268 expected_name_servers = \ 269 dhcpv6_test_server.DHCPV6_NAME_SERVERS.split(',') 270 if actual_name_servers != expected_name_servers: 271 raise error.TestFail('Name servers mismatch: actual %r expected %r' 272 % (actual_name_servers, expected_name_servers)) 273 # Verify domain search. 274 actual_domain_search = dhcpv6_config.get(DHCPV6_KEY_SEARCH_DOMAIN_LIST) 275 expected_domain_search = \ 276 dhcpv6_test_server.DHCPV6_DOMAIN_SEARCH.split(',') 277 if actual_domain_search != expected_domain_search: 278 raise error.TestFail('Domain search list mismatch: ' 279 'actual %r expected %r' % 280 (actual_domain_search, expected_domain_search)) 281