# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """ Base class for DHCP tests. This class just sets up a little bit of plumbing, like a virtual ethernet device with one end that looks like a real ethernet device to shill and a DHCP test server on the end that doesn't look like a real ethernet interface to shill. Child classes should override test_body() with the logic of their test. The plumbing of DhcpTestBase is accessible via properties. """ import logging import socket import struct import time import traceback from autotest_lib.client.bin import test from autotest_lib.client.common_lib import error from autotest_lib.client.common_lib.cros import virtual_ethernet_pair from autotest_lib.client.cros import dhcp_handling_rule from autotest_lib.client.cros import dhcp_packet from autotest_lib.client.cros import dhcp_test_server from autotest_lib.client.cros.networking import shill_proxy # These are keys that may be used with the DBus dictionary returned from # DhcpTestBase.get_interface_ipconfig(). DHCPCD_KEY_NAMESERVERS = 'NameServers' DHCPCD_KEY_GATEWAY = 'Gateway' DHCPCD_KEY_BROADCAST_ADDR = 'Broadcast' DHCPCD_KEY_ADDRESS = 'Address' DHCPCD_KEY_PREFIX_LENGTH = 'Prefixlen' DHCPCD_KEY_DOMAIN_NAME = 'DomainName' DHCPCD_KEY_ACCEPTED_HOSTNAME = 'AcceptedHostname' DHCPCD_KEY_SEARCH_DOMAIN_LIST = 'SearchDomains' # We should be able to complete a DHCP negotiation in this amount of time. DHCP_NEGOTIATION_TIMEOUT_SECONDS = 10 # After DHCP completes, an ipconfig should appear shortly after IPCONFIG_POLL_COUNT = 5 IPCONFIG_POLL_PERIOD_SECONDS = 0.5 class DhcpTestBase(test.test): """Parent class for tests that work verify DHCP behavior.""" version = 1 @staticmethod def rewrite_ip_suffix(subnet_mask, ip_in_subnet, ip_suffix): """ Create a new IPv4 address in a subnet by bitwise and'ing an existing address |ip_in_subnet| with |subnet_mask| and bitwise or'ing in |ip_suffix|. For safety, bitwise or the suffix with the complement of the subnet mask. Usage: rewrite_ip_suffix("255.255.255.0", "192.168.1.1", "0.0.0.105") The example usage will return "192.168.1.105". @param subnet_mask string subnet mask, e.g. "255.255.255.0" @param ip_in_subnet string an IP address in the desired subnet @param ip_suffix string suffix desired for new address, e.g. "0.0.0.105" @return string IP address on in the same subnet with specified suffix. """ mask = struct.unpack('!I', socket.inet_aton(subnet_mask))[0] subnet = mask & struct.unpack('!I', socket.inet_aton(ip_in_subnet))[0] suffix = ~mask & struct.unpack('!I', socket.inet_aton(ip_suffix))[0] return socket.inet_ntoa(struct.pack('!I', (subnet | suffix))) def get_device(self, interface_name): """Finds the corresponding Device object for an interface with the name |interface_name|. @param interface_name string The name of the interface to check. @return DBus interface object representing the associated device. """ return self.shill_proxy.find_object('Device', {'Name': interface_name}) def find_ethernet_service(self, interface_name): """Finds the corresponding service object for an Ethernet interface. @param interface_name string The name of the associated interface @return Service object representing the associated service. """ device = self.get_device(interface_name) device_path = shill_proxy.ShillProxy.dbus2primitive(device.object_path) return self.shill_proxy.find_object('Service', {'Device': device_path}) def get_interface_ipconfig_objects(self, interface_name): """ Returns a list of dbus object proxies for |interface_name|. Returns an empty list if no such interface exists. @param interface_name string name of the device to query (e.g., "eth0"). @return list of objects representing DBus IPConfig RPC endpoints. """ device = self.get_device(interface_name) if device is None: return [] device_properties = device.GetProperties(utf8_strings=True) proxy = self.shill_proxy ipconfig_object = proxy.DBUS_TYPE_IPCONFIG return filter(bool, [ proxy.get_dbus_object(ipconfig_object, property_path) for property_path in device_properties['IPConfigs'] ]) def get_interface_ipconfig(self, interface_name): """ Returns a dictionary containing settings for an |interface_name| set via DHCP. Returns None if no such interface or setting bundle on that interface can be found in shill. @param interface_name string name of the device to query (e.g., "eth0"). @return dict containing the the properties of the IPConfig stripped of DBus meta-data or None. """ dhcp_properties = None for ipconfig in self.get_interface_ipconfig_objects(interface_name): logging.info('Looking at ipconfig %r', ipconfig) ipconfig_properties = ipconfig.GetProperties(utf8_strings=True) if 'Method' not in ipconfig_properties: logging.info('Found ipconfig object with no method field') continue if ipconfig_properties['Method'] != 'dhcp': logging.info('Found ipconfig object with method != dhcp') continue if dhcp_properties != None: raise error.TestFail('Found multiple ipconfig objects ' 'with method == dhcp') dhcp_properties = ipconfig_properties if dhcp_properties is None: logging.info('Did not find IPConfig object with method == dhcp') return None logging.info('Got raw dhcp config dbus object: %s.', dhcp_properties) return shill_proxy.ShillProxy.dbus2primitive(dhcp_properties) def run_once(self): self._server = None self._server_ip = None self._ethernet_pair = None self._server = None self._shill_proxy = shill_proxy.ShillProxy() try: self._ethernet_pair = virtual_ethernet_pair.VirtualEthernetPair( peer_interface_name='pseudoethernet0', peer_interface_ip=None) self._ethernet_pair.setup() if not self._ethernet_pair.is_healthy: raise error.TestFail('Could not create virtual ethernet pair.') self._server_ip = self._ethernet_pair.interface_ip self._server = dhcp_test_server.DhcpTestServer( self._ethernet_pair.interface_name) self._server.start() if not self._server.is_healthy: raise error.TestFail('Could not start DHCP test server.') self._subnet_mask = self._ethernet_pair.interface_subnet_mask self.test_body() except (error.TestFail, error.TestNAError): # Pass these through without modification. raise except Exception as e: logging.error('Caught exception: %s.', str(e)) logging.error('Trace: %s', traceback.format_exc()) raise error.TestFail('Caught exception: %s.' % str(e)) finally: if self._server is not None: self._server.stop() if self._ethernet_pair is not None: self._ethernet_pair.teardown() def test_body(self): """ Override this method with the body of your test. You may safely assume that the the properties exposed by DhcpTestBase correctly return references to the test apparatus. """ raise error.TestFail('No test body implemented') @property def server_ip(self): """ Return the IP address of the side of the interface that the DHCP test server is bound to. The server itself is bound the the broadcast address on the interface. """ return self._server_ip @property def server(self): """ Returns a reference to the DHCP test server. Use this to add handlers and run tests. """ return self._server @property def ethernet_pair(self): """ Returns a reference to the virtual ethernet pair created to run DHCP tests on. """ return self._ethernet_pair @property def shill_proxy(self): """ Returns a the shill proxy instance. """ return self._shill_proxy def negotiate_and_check_lease(self, dhcp_options, custom_fields={}, disable_check=False): """ Perform DHCP lease negotiation, and ensure that the resulting ipconfig matches the DHCP options provided to the server. @param dhcp_options dict of properties the DHCP server should provide. @param custom_fields dict of custom DHCP parameters to add to server. @param disable_check bool whether to perform IPConfig parameter checking. """ if dhcp_packet.OPTION_REQUESTED_IP not in dhcp_options: raise error.TestFail('You must specify OPTION_REQUESTED_IP to ' 'negotiate a DHCP lease') intended_ip = dhcp_options[dhcp_packet.OPTION_REQUESTED_IP] # Build up the handling rules for the server and start the test. rules = [] rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery( intended_ip, self.server_ip, dhcp_options, custom_fields)) rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToRequest( intended_ip, self.server_ip, dhcp_options, custom_fields)) rules[-1].is_final_handler = True self.server.start_test(rules, DHCP_NEGOTIATION_TIMEOUT_SECONDS) logging.info('Server is negotiating new lease with options: %s', dhcp_options) self.server.wait_for_test_to_finish() if not self.server.last_test_passed: raise error.TestFail( 'Test failed: active rule is %s' % self.server.current_rule) if disable_check: logging.info('Skipping check of negotiated DHCP lease parameters.') else: self.wait_for_dhcp_propagation() self.check_dhcp_config(dhcp_options) def wait_for_dhcp_propagation(self): """ Wait for configuration to propagate over dbus to shill. TODO(wiley) Make this event based. This is pretty sloppy. """ time.sleep(0.1) def check_dhcp_config(self, dhcp_options): """ Compare the DHCP ipconfig with DHCP lease parameters to ensure that the DUT attained the correct values. @param dhcp_options dict of properties the DHCP server provided. """ # The config is what the interface was actually configured with, as # opposed to dhcp_options, which is what the server expected it be # configured with. for attempt in range(IPCONFIG_POLL_COUNT): dhcp_config = self.get_interface_ipconfig( self.ethernet_pair.peer_interface_name) if dhcp_config is not None: break time.sleep(IPCONFIG_POLL_PERIOD_SECONDS) else: raise error.TestFail('Failed to retrieve DHCP ipconfig object ' 'from shill.') logging.debug('Got DHCP config: %s', str(dhcp_config)) expected_address = dhcp_options.get(dhcp_packet.OPTION_REQUESTED_IP) configured_address = dhcp_config.get(DHCPCD_KEY_ADDRESS) if expected_address != configured_address: raise error.TestFail('Interface configured with IP address not ' 'granted by the DHCP server after DHCP ' 'negotiation. Expected %s but got %s.' % (expected_address, configured_address)) # While DNS related settings only propagate to the system when the # service is marked as the default service, we can still check the # IP address on the interface, since that is set immediately. interface_address = self.ethernet_pair.peer_interface_ip if expected_address != interface_address: raise error.TestFail('shill somehow knew about the proper DHCP ' 'assigned address: %s, but configured the ' 'interface with something completely ' 'different: %s.' % (expected_address, interface_address)) expected_dns_servers = dhcp_options.get(dhcp_packet.OPTION_DNS_SERVERS) configured_dns_servers = dhcp_config.get(DHCPCD_KEY_NAMESERVERS) if (expected_dns_servers is not None and expected_dns_servers != configured_dns_servers): raise error.TestFail('Expected to be configured with DNS server ' 'list %s, but was configured with %s ' 'instead.' % (expected_dns_servers, configured_dns_servers)) expected_domain_name = dhcp_options.get(dhcp_packet.OPTION_DOMAIN_NAME) configured_domain_name = dhcp_config.get(DHCPCD_KEY_DOMAIN_NAME) if (expected_domain_name is not None and expected_domain_name != configured_domain_name): raise error.TestFail('Expected to be configured with domain ' 'name %s, but got %s instead.' % (expected_domain_name, configured_domain_name)) expected_host_name = dhcp_options.get(dhcp_packet.OPTION_HOST_NAME) configured_host_name = dhcp_config.get(DHCPCD_KEY_ACCEPTED_HOSTNAME) if (expected_host_name is not None and expected_host_name != configured_host_name): raise error.TestFail('Expected to be configured with host ' 'name %s, but got %s instead.' % (expected_host_name, configured_host_name)) expected_search_list = dhcp_options.get( dhcp_packet.OPTION_DNS_DOMAIN_SEARCH_LIST) configured_search_list = dhcp_config.get(DHCPCD_KEY_SEARCH_DOMAIN_LIST) if (expected_search_list is not None and expected_search_list != configured_search_list): raise error.TestFail('Expected to be configured with domain ' 'search list %s, but got %s instead.' % (expected_search_list, configured_search_list)) expected_routers = dhcp_options.get(dhcp_packet.OPTION_ROUTERS) if (not expected_routers and dhcp_options.get(dhcp_packet.OPTION_CLASSLESS_STATIC_ROUTES)): classless_static_routes = dhcp_options[ dhcp_packet.OPTION_CLASSLESS_STATIC_ROUTES] for prefix, destination, gateway in classless_static_routes: if not prefix: logging.info('Using %s as the default gateway', gateway) expected_routers = [ gateway ] break configured_router = dhcp_config.get(DHCPCD_KEY_GATEWAY) if expected_routers and expected_routers[0] != configured_router: raise error.TestFail('Expected to be configured with gateway %s, ' 'but got %s instead.' % (expected_routers[0], configured_router)) self.server.wait_for_test_to_finish() if not self.server.last_test_passed: raise error.TestFail('Test server didn\'t get all the messages it ' 'was told to expect for renewal.')