1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5from autotest_lib.client.common_lib import error 6from autotest_lib.client.cros import dhcp_handling_rule 7from autotest_lib.client.cros import dhcp_packet 8from autotest_lib.client.cros import dhcp_test_base 9 10# Length of time the lease from the DHCP server is valid. 11LEASE_TIME_SECONDS = 3600 12# We'll fill in the subnet and give this address to the client. 13INTENDED_IP_SUFFIX = "0.0.0.101" 14 15class network_DhcpNak(dhcp_test_base.DhcpTestBase): 16 """ 17 Tests a DHCP client's handling of NAK messages. 18 19 Negotiates a lease, and then tests how the DHCP client processes 20 NAKs in two scenarios. In the first scenario, the DHCP client 21 is started anew, but with the cached lease on disk. In the 22 second scenario, an already-running DHCP client is asked to 23 renew its lease. 24 25 In both scenarios, the NAK messages omit the DHCP server-id 26 option. This is to emulate some DHCP servers (e.g. OpenBSD 4.6), 27 which omit the DHCP server-id in NAK messages. 28 """ 29 30 def common_setup(self): 31 """ 32 Run common setup steps. 33 """ 34 subnet_mask = self.ethernet_pair.interface_subnet_mask 35 self.intended_ip = dhcp_test_base.DhcpTestBase.rewrite_ip_suffix( 36 subnet_mask, self.server_ip, INTENDED_IP_SUFFIX) 37 self.interface_name = self.ethernet_pair.peer_interface_name 38 self.dhcp_options = { 39 dhcp_packet.OPTION_SERVER_ID : self.server_ip, 40 dhcp_packet.OPTION_SUBNET_MASK : subnet_mask, 41 dhcp_packet.OPTION_IP_LEASE_TIME : LEASE_TIME_SECONDS, 42 dhcp_packet.OPTION_REQUESTED_IP : self.intended_ip, 43 dhcp_packet.OPTION_DNS_SERVERS : [], 44 dhcp_packet.OPTION_DOMAIN_NAME : '', 45 dhcp_packet.OPTION_DNS_DOMAIN_SEARCH_LIST : [], 46 } 47 self.negotiate_and_check_lease(self.dhcp_options) 48 49 def reconnect_service(self): 50 """ 51 Disconnect and reconnect Ethernet. 52 53 Ask shill to disconnect and reconnect the Service for our 54 virtual Ethernet link. This causes shill to shut down and 55 restart dhcpcd for the link. 56 """ 57 service = self.find_ethernet_service(self.interface_name) 58 service.Disconnect() 59 rules = [ 60 # Respond to DISCOVERY, but then NAK the REQUEST. 61 dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery( 62 self.intended_ip, self.server_ip, self.dhcp_options, {}), 63 dhcp_handling_rule.DhcpHandlingRule_RejectRequest(), 64 65 # Allow a successful negotiation the second time around. 66 dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery( 67 self.intended_ip, self.server_ip, self.dhcp_options, {}), 68 dhcp_handling_rule.DhcpHandlingRule_RespondToRequest( 69 self.intended_ip, self.server_ip, self.dhcp_options, {}), 70 ] 71 rules[-1].is_final_handler = True 72 self.server.start_test( 73 rules, dhcp_test_base.DHCP_NEGOTIATION_TIMEOUT_SECONDS) 74 service.Connect() 75 76 def force_dhcp_renew(self): 77 """ 78 Force a DHCP renewal. 79 80 Ask shill to renew the DHCP lease associated with our 81 virtual Ethernet link. 82 """ 83 rules = [ 84 # Reject REQUEST from renewal attempt. 85 dhcp_handling_rule.DhcpHandlingRule_RejectRequest(), 86 87 # Allow a successful negotiation after that. 88 dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery( 89 self.intended_ip, self.server_ip, self.dhcp_options, {}), 90 dhcp_handling_rule.DhcpHandlingRule_RespondToRequest( 91 self.intended_ip, self.server_ip, self.dhcp_options, {}), 92 ] 93 rules[-1].is_final_handler = True 94 self.server.start_test( 95 rules, dhcp_test_base.DHCP_NEGOTIATION_TIMEOUT_SECONDS) 96 self.get_device(self.interface_name).RenewDHCPLease() 97 98 def send_ack_then_nak(self): 99 """ 100 Send an ACK followed by a NAK on re-connect to Ethernet. 101 102 Ask shill to disconnect and reconnect the Service for our 103 virtual Ethernet link. This causes shill to shut down and 104 restart dhcpcd for the link. Then perform a test where 105 the server responds to a REQUEST with an ACK followed by 106 an ACK. 107 """ 108 service = self.find_ethernet_service(self.interface_name) 109 service.Disconnect() 110 rules = [ 111 # Respond to DISCOVERY, but then both ACK and NAK the REQUEST. 112 dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery( 113 self.intended_ip, self.server_ip, self.dhcp_options, {}), 114 dhcp_handling_rule.DhcpHandlingRule_RejectAndRespondToRequest( 115 self.intended_ip, self.server_ip, self.dhcp_options, {}, 116 False), 117 ] 118 rules[-1].is_final_handler = True 119 self.server.start_test( 120 rules, dhcp_test_base.DHCP_NEGOTIATION_TIMEOUT_SECONDS) 121 service.Connect() 122 123 def send_nak_then_ack_with_conflict(self): 124 """ 125 Send an NAK followed by an ACK on re-connect to with address conflict. 126 127 Ask shill to disconnect and reconnect the Service for our 128 virtual Ethernet link. This causes shill to shut down and 129 restart dhcpcd for the link. 130 131 On reconnect, perform a test where the server responds to a 132 REQUEST with a NAK followed by an ACK, however with a lease 133 for an invalid address (the same IP address as the DHCP server). 134 135 Ensure that the client rejects the invalid lease with a DECLINE, 136 and that it also ignores the first OFFER for the same invalid 137 address. 138 """ 139 service = self.find_ethernet_service(self.interface_name) 140 service.Disconnect() 141 rules = [ 142 # Respond to DISCOVERY, but then both NAK then ACK the REQUEST, 143 # supplying the server's own IP address. 144 dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery( 145 self.server_ip, self.server_ip, self.dhcp_options, {}), 146 dhcp_handling_rule.DhcpHandlingRule_RejectAndRespondToRequest( 147 self.server_ip, self.server_ip, self.dhcp_options, {}, 148 True), 149 150 # The client should eventually reject this lease since this 151 # address is in use. 152 dhcp_handling_rule.DhcpHandlingRule_AcceptDecline( 153 self.server_ip, self.dhcp_options, {}), 154 155 # Offer up the same (invalid) IP address. 156 dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery( 157 self.server_ip, self.server_ip, self.dhcp_options, {}), 158 159 # The client should ignore the previous offer and perform 160 # another DISCOVER request. 161 dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery( 162 self.intended_ip, self.server_ip, self.dhcp_options, {}), 163 dhcp_handling_rule.DhcpHandlingRule_RespondToRequest( 164 self.intended_ip, self.server_ip, self.dhcp_options, {}), 165 ] 166 rules[-1].is_final_handler = True 167 self.server.start_test( 168 rules, dhcp_test_base.DHCP_NEGOTIATION_TIMEOUT_SECONDS) 169 service.Connect() 170 171 def send_nak_then_ack_then_verify(self): 172 """ 173 Send an NAK followed by an ACK then verify client IP address. 174 175 Ask shill to disconnect and reconnect the Service for our 176 virtual Ethernet link. This causes shill to shut down and 177 restart dhcpcd for the link. 178 179 On reconnect, perform a test where the server responds to a 180 REQUEST with a NAK followed by an ACK. This method asserts 181 that the client does not DECLINE this address. 182 """ 183 service = self.find_ethernet_service(self.interface_name) 184 service.Disconnect() 185 186 # This rule serves two purposes: First it asserts that the client 187 # does not send a DECLINE response. Second, it waits until the 188 # test timeout, by which time client will have completed an "ARP 189 # self" operation to validate the offered IP adddres. 190 decline_rule = dhcp_handling_rule.DhcpHandlingRule_AcceptDecline( 191 self.intended_ip, self.dhcp_options, {}) 192 193 rules = [ 194 # Respond to DISCOVERY, but then both NAK then ACK the REQUEST, 195 # supplying the server's own IP address. 196 dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery( 197 self.intended_ip, self.server_ip, self.dhcp_options, {}), 198 dhcp_handling_rule.DhcpHandlingRule_RejectAndRespondToRequest( 199 self.intended_ip, self.server_ip, self.dhcp_options, {}, 200 True), 201 decline_rule 202 ] 203 rules[-1].is_final_handler = True 204 self.server.start_test( 205 rules, dhcp_test_base.DHCP_NEGOTIATION_TIMEOUT_SECONDS) 206 service.Connect() 207 self.server.wait_for_test_to_finish() 208 209 # This is a negative test, since we expect the last rule to fail. 210 if self.server.last_test_passed: 211 raise error.TestFail('DHCP DECLINE message was received') 212 elif self.server.current_rule != decline_rule: 213 raise error.TestFail('Failed on %s rule' % self.server.current_rule) 214 215 dhcp_config = self.get_interface_ipconfig( 216 self.ethernet_pair.peer_interface_name) 217 if dhcp_config is None: 218 raise error.TestFail('Did not get a DHCP config') 219 if dhcp_config[dhcp_test_base.DHCPCD_KEY_ADDRESS] != self.intended_ip: 220 raise error.TestFail('Client did not attain expected address %s' % 221 self.intended_ip) 222 223 def test_body(self): 224 """ 225 Entry point for this test. 226 227 This is called from DhcpTestBase.run_once(). 228 """ 229 self.common_setup() 230 for sub_test in (self.reconnect_service, 231 self.force_dhcp_renew, 232 self.send_ack_then_nak, 233 self.send_nak_then_ack_with_conflict): 234 sub_test() 235 self.server.wait_for_test_to_finish() 236 if not self.server.last_test_passed: 237 raise error.TestFail('Test failed (%s): active rule is %s' % ( 238 sub_test.__name__, self.server.current_rule)) 239 240 # This method is outside the loop above since it performs its own 241 # special verification. 242 self.send_nak_then_ack_then_verify() 243