1# Lint as: python2, python3 2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6""" 7Base class for DHCP tests. This class just sets up a little bit of plumbing, 8like a virtual ethernet device with one end that looks like a real ethernet 9device to shill and a DHCP test server on the end that doesn't look like a real 10ethernet interface to shill. Child classes should override test_body() with the 11logic of their test. The plumbing of DhcpTestBase is accessible via properties. 12""" 13 14from __future__ import absolute_import 15from __future__ import division 16from __future__ import print_function 17 18import logging 19from six.moves import filter 20from six.moves import range 21import socket 22import struct 23import time 24import traceback 25 26from autotest_lib.client.bin import test 27from autotest_lib.client.common_lib import error 28from autotest_lib.client.common_lib.cros import virtual_ethernet_pair 29from autotest_lib.client.cros import dhcp_handling_rule 30from autotest_lib.client.cros import dhcp_packet 31from autotest_lib.client.cros import dhcp_test_server 32from autotest_lib.client.cros.networking import shill_proxy 33 34 35# These are keys that may be used with the DBus dictionary returned from 36# DhcpTestBase.get_interface_ipconfig(). 37DHCPCD_KEY_NAMESERVERS = 'NameServers' 38DHCPCD_KEY_GATEWAY = 'Gateway' 39DHCPCD_KEY_BROADCAST_ADDR = 'Broadcast' 40DHCPCD_KEY_ADDRESS = 'Address' 41DHCPCD_KEY_PREFIX_LENGTH = 'Prefixlen' 42DHCPCD_KEY_DOMAIN_NAME = 'DomainName' 43DHCPCD_KEY_ACCEPTED_HOSTNAME = 'AcceptedHostname' 44DHCPCD_KEY_SEARCH_DOMAIN_LIST = 'SearchDomains' 45 46# We should be able to complete a DHCP negotiation in this amount of time. 47DHCP_NEGOTIATION_TIMEOUT_SECONDS = 10 48 49# After DHCP completes, an ipconfig should appear shortly after 50IPCONFIG_POLL_COUNT = 5 51IPCONFIG_POLL_PERIOD_SECONDS = 0.5 52 53class DhcpTestBase(test.test): 54 """Parent class for tests that work verify DHCP behavior.""" 55 version = 1 56 57 @staticmethod 58 def rewrite_ip_suffix(subnet_mask, ip_in_subnet, ip_suffix): 59 """ 60 Create a new IPv4 address in a subnet by bitwise and'ing an existing 61 address |ip_in_subnet| with |subnet_mask| and bitwise or'ing in 62 |ip_suffix|. For safety, bitwise or the suffix with the complement of 63 the subnet mask. 64 65 Usage: rewrite_ip_suffix("255.255.255.0", "192.168.1.1", "0.0.0.105") 66 67 The example usage will return "192.168.1.105". 68 69 @param subnet_mask string subnet mask, e.g. "255.255.255.0" 70 @param ip_in_subnet string an IP address in the desired subnet 71 @param ip_suffix string suffix desired for new address, e.g. "0.0.0.105" 72 73 @return string IP address on in the same subnet with specified suffix. 74 75 """ 76 mask = struct.unpack('!I', socket.inet_aton(subnet_mask))[0] 77 subnet = mask & struct.unpack('!I', socket.inet_aton(ip_in_subnet))[0] 78 suffix = ~mask & struct.unpack('!I', socket.inet_aton(ip_suffix))[0] 79 return socket.inet_ntoa(struct.pack('!I', (subnet | suffix))) 80 81 82 def get_device(self, interface_name): 83 """Finds the corresponding Device object for an interface with 84 the name |interface_name|. 85 86 @param interface_name string The name of the interface to check. 87 88 @return DBus interface object representing the associated device. 89 90 """ 91 return self.shill_proxy.find_object('Device', 92 {'Name': interface_name}) 93 94 95 def find_ethernet_service(self, interface_name): 96 """Finds the corresponding service object for an Ethernet interface. 97 98 @param interface_name string The name of the associated interface 99 100 @return Service object representing the associated service. 101 102 """ 103 device = self.get_device(interface_name) 104 device_path = shill_proxy.ShillProxy.dbus2primitive(device.object_path) 105 return self.shill_proxy.find_object('Service', {'Device': device_path}) 106 107 108 def get_interface_ipconfig_objects(self, interface_name): 109 """ 110 Returns a list of dbus object proxies for |interface_name|. 111 Returns an empty list if no such interface exists. 112 113 @param interface_name string name of the device to query (e.g., "eth0"). 114 115 @return list of objects representing DBus IPConfig RPC endpoints. 116 117 """ 118 device = self.get_device(interface_name) 119 if device is None: 120 return [] 121 122 device_properties = device.GetProperties(utf8_strings=True) 123 proxy = self.shill_proxy 124 125 ipconfig_object = proxy.DBUS_TYPE_IPCONFIG 126 return list(filter(bool, 127 [ proxy.get_dbus_object(ipconfig_object, property_path) 128 for property_path in device_properties['IPConfigs'] ])) 129 130 131 def get_interface_ipconfig(self, interface_name): 132 """ 133 Returns a dictionary containing settings for an |interface_name| set 134 via DHCP. Returns None if no such interface or setting bundle on 135 that interface can be found in shill. 136 137 @param interface_name string name of the device to query (e.g., "eth0"). 138 139 @return dict containing the the properties of the IPConfig stripped 140 of DBus meta-data or None. 141 142 """ 143 dhcp_properties = None 144 for ipconfig in self.get_interface_ipconfig_objects(interface_name): 145 logging.info('Looking at ipconfig %r', ipconfig) 146 ipconfig_properties = ipconfig.GetProperties(utf8_strings=True) 147 if 'Method' not in ipconfig_properties: 148 logging.info('Found ipconfig object with no method field') 149 continue 150 if ipconfig_properties['Method'] != 'dhcp': 151 logging.info('Found ipconfig object with method != dhcp') 152 continue 153 if dhcp_properties != None: 154 raise error.TestFail('Found multiple ipconfig objects ' 155 'with method == dhcp') 156 dhcp_properties = ipconfig_properties 157 if dhcp_properties is None: 158 logging.info('Did not find IPConfig object with method == dhcp') 159 return None 160 logging.info('Got raw dhcp config dbus object: %s.', dhcp_properties) 161 return shill_proxy.ShillProxy.dbus2primitive(dhcp_properties) 162 163 164 def run_once(self): 165 self._server = None 166 self._server_ip = None 167 self._ethernet_pair = None 168 self._server = None 169 self._shill_proxy = shill_proxy.ShillProxy() 170 try: 171 self._ethernet_pair = virtual_ethernet_pair.VirtualEthernetPair( 172 peer_interface_name='pseudoethernet0', 173 peer_interface_ip=None) 174 self._ethernet_pair.setup() 175 if not self._ethernet_pair.is_healthy: 176 raise error.TestFail('Could not create virtual ethernet pair.') 177 self._server_ip = self._ethernet_pair.interface_ip 178 self._server = dhcp_test_server.DhcpTestServer( 179 self._ethernet_pair.interface_name) 180 self._server.start() 181 if not self._server.is_healthy: 182 raise error.TestFail('Could not start DHCP test server.') 183 self._subnet_mask = self._ethernet_pair.interface_subnet_mask 184 self.test_body() 185 except (error.TestFail, error.TestNAError): 186 # Pass these through without modification. 187 raise 188 except Exception as e: 189 logging.error('Caught exception: %s.', str(e)) 190 logging.error('Trace: %s', traceback.format_exc()) 191 raise error.TestFail('Caught exception: %s.' % str(e)) 192 finally: 193 if self._server is not None: 194 self._server.stop() 195 if self._ethernet_pair is not None: 196 self._ethernet_pair.teardown() 197 198 def test_body(self): 199 """ 200 Override this method with the body of your test. You may safely assume 201 that the the properties exposed by DhcpTestBase correctly return 202 references to the test apparatus. 203 """ 204 raise error.TestFail('No test body implemented') 205 206 @property 207 def server_ip(self): 208 """ 209 Return the IP address of the side of the interface that the DHCP test 210 server is bound to. The server itself is bound the the broadcast 211 address on the interface. 212 """ 213 return self._server_ip 214 215 @property 216 def server(self): 217 """ 218 Returns a reference to the DHCP test server. Use this to add handlers 219 and run tests. 220 """ 221 return self._server 222 223 @property 224 def ethernet_pair(self): 225 """ 226 Returns a reference to the virtual ethernet pair created to run DHCP 227 tests on. 228 """ 229 return self._ethernet_pair 230 231 @property 232 def shill_proxy(self): 233 """ 234 Returns a the shill proxy instance. 235 """ 236 return self._shill_proxy 237 238 def negotiate_and_check_lease(self, 239 dhcp_options, 240 custom_fields={}, 241 disable_check=False): 242 """ 243 Perform DHCP lease negotiation, and ensure that the resulting 244 ipconfig matches the DHCP options provided to the server. 245 246 @param dhcp_options dict of properties the DHCP server should provide. 247 @param custom_fields dict of custom DHCP parameters to add to server. 248 @param disable_check bool whether to perform IPConfig parameter 249 checking. 250 251 """ 252 if dhcp_packet.OPTION_REQUESTED_IP not in dhcp_options: 253 raise error.TestFail('You must specify OPTION_REQUESTED_IP to ' 254 'negotiate a DHCP lease') 255 intended_ip = dhcp_options[dhcp_packet.OPTION_REQUESTED_IP] 256 # Build up the handling rules for the server and start the test. 257 rules = [] 258 rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery( 259 intended_ip, 260 self.server_ip, 261 dhcp_options, 262 custom_fields)) 263 rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToRequest( 264 intended_ip, 265 self.server_ip, 266 dhcp_options, 267 custom_fields)) 268 rules[-1].is_final_handler = True 269 self.server.start_test(rules, DHCP_NEGOTIATION_TIMEOUT_SECONDS) 270 logging.info('Server is negotiating new lease with options: %s', 271 dhcp_options) 272 self.server.wait_for_test_to_finish() 273 if not self.server.last_test_passed: 274 raise error.TestFail( 275 'Test failed: active rule is %s' % self.server.current_rule) 276 277 if disable_check: 278 logging.info('Skipping check of negotiated DHCP lease parameters.') 279 else: 280 self.wait_for_dhcp_propagation() 281 self.check_dhcp_config(dhcp_options) 282 283 def wait_for_dhcp_propagation(self): 284 """ 285 Wait for configuration to propagate over dbus to shill. 286 TODO(wiley) Make this event based. This is pretty sloppy. 287 """ 288 time.sleep(0.1) 289 290 def check_dhcp_config(self, dhcp_options): 291 """ 292 Compare the DHCP ipconfig with DHCP lease parameters to ensure 293 that the DUT attained the correct values. 294 295 @param dhcp_options dict of properties the DHCP server provided. 296 297 """ 298 # The config is what the interface was actually configured with, as 299 # opposed to dhcp_options, which is what the server expected it be 300 # configured with. 301 for attempt in range(IPCONFIG_POLL_COUNT): 302 dhcp_config = self.get_interface_ipconfig( 303 self.ethernet_pair.peer_interface_name) 304 if dhcp_config is not None: 305 break 306 time.sleep(IPCONFIG_POLL_PERIOD_SECONDS) 307 else: 308 raise error.TestFail('Failed to retrieve DHCP ipconfig object ' 309 'from shill.') 310 311 logging.debug('Got DHCP config: %s', str(dhcp_config)) 312 expected_address = dhcp_options.get(dhcp_packet.OPTION_REQUESTED_IP) 313 configured_address = dhcp_config.get(DHCPCD_KEY_ADDRESS) 314 if expected_address != configured_address: 315 raise error.TestFail('Interface configured with IP address not ' 316 'granted by the DHCP server after DHCP ' 317 'negotiation. Expected %s but got %s.' % 318 (expected_address, configured_address)) 319 320 # While DNS related settings only propagate to the system when the 321 # service is marked as the default service, we can still check the 322 # IP address on the interface, since that is set immediately. 323 interface_address = self.ethernet_pair.peer_interface_ip 324 if expected_address != interface_address: 325 raise error.TestFail('shill somehow knew about the proper DHCP ' 326 'assigned address: %s, but configured the ' 327 'interface with something completely ' 328 'different: %s.' % 329 (expected_address, interface_address)) 330 331 expected_dns_servers = dhcp_options.get(dhcp_packet.OPTION_DNS_SERVERS) 332 configured_dns_servers = dhcp_config.get(DHCPCD_KEY_NAMESERVERS) 333 if (expected_dns_servers is not None and 334 expected_dns_servers != configured_dns_servers): 335 raise error.TestFail('Expected to be configured with DNS server ' 336 'list %s, but was configured with %s ' 337 'instead.' % (expected_dns_servers, 338 configured_dns_servers)) 339 340 expected_domain_name = dhcp_options.get(dhcp_packet.OPTION_DOMAIN_NAME) 341 configured_domain_name = dhcp_config.get(DHCPCD_KEY_DOMAIN_NAME) 342 if (expected_domain_name is not None and 343 expected_domain_name != configured_domain_name): 344 raise error.TestFail('Expected to be configured with domain ' 345 'name %s, but got %s instead.' % 346 (expected_domain_name, configured_domain_name)) 347 348 expected_host_name = dhcp_options.get(dhcp_packet.OPTION_HOST_NAME) 349 configured_host_name = dhcp_config.get(DHCPCD_KEY_ACCEPTED_HOSTNAME) 350 if (expected_host_name is not None and 351 expected_host_name != configured_host_name): 352 raise error.TestFail('Expected to be configured with host ' 353 'name %s, but got %s instead.' % 354 (expected_host_name, configured_host_name)) 355 356 expected_search_list = dhcp_options.get( 357 dhcp_packet.OPTION_DNS_DOMAIN_SEARCH_LIST) 358 configured_search_list = dhcp_config.get(DHCPCD_KEY_SEARCH_DOMAIN_LIST) 359 if (expected_search_list is not None and 360 expected_search_list != configured_search_list): 361 raise error.TestFail('Expected to be configured with domain ' 362 'search list %s, but got %s instead.' % 363 (expected_search_list, configured_search_list)) 364 365 expected_routers = dhcp_options.get(dhcp_packet.OPTION_ROUTERS) 366 if (not expected_routers and 367 dhcp_options.get(dhcp_packet.OPTION_CLASSLESS_STATIC_ROUTES)): 368 classless_static_routes = dhcp_options[ 369 dhcp_packet.OPTION_CLASSLESS_STATIC_ROUTES] 370 for prefix, destination, gateway in classless_static_routes: 371 if not prefix: 372 logging.info('Using %s as the default gateway', gateway) 373 expected_routers = [ gateway ] 374 break 375 configured_router = dhcp_config.get(DHCPCD_KEY_GATEWAY) 376 if expected_routers and expected_routers[0] != configured_router: 377 raise error.TestFail('Expected to be configured with gateway %s, ' 378 'but got %s instead.' % 379 (expected_routers[0], configured_router)) 380 381 self.server.wait_for_test_to_finish() 382 if not self.server.last_test_passed: 383 raise error.TestFail('Test server didn\'t get all the messages it ' 384 'was told to expect for renewal.') 385