1#!/usr/bin/python2 2 3# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7import logging 8import socket 9import sys 10import time 11 12import common 13 14from autotest_lib.client.cros import dhcp_handling_rule 15from autotest_lib.client.cros import dhcp_packet 16from autotest_lib.client.cros import dhcp_test_server 17 18TEST_DATA_PATH_PREFIX = "client/cros/dhcp_test_data/" 19 20TEST_CLASSLESS_STATIC_ROUTE_DATA = \ 21 "\x12\x0a\x09\xc0\xac\x1f\x9b\x0a" \ 22 "\x00\xc0\xa8\x00\xfe" 23 24TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED = [ 25 (18, "10.9.192.0", "172.31.155.10"), 26 (0, "0.0.0.0", "192.168.0.254") 27 ] 28 29TEST_DOMAIN_SEARCH_LIST_COMPRESSED = \ 30 "\x03eng\x06google\x03com\x00\x09marketing\xC0\x04" 31 32TEST_DOMAIN_SEARCH_LIST_PARSED = ("eng.google.com", "marketing.google.com") 33 34# At this time, we don't support the compression allowed in the RFC. 35# This is correct and sufficient for our purposes. 36TEST_DOMAIN_SEARCH_LIST_EXPECTED = \ 37 "\x03eng\x06google\x03com\x00\x09marketing\x06google\x03com\x00" 38 39TEST_DOMAIN_SEARCH_LIST1 = \ 40 "w\x10\x03eng\x06google\x03com\x00" 41 42TEST_DOMAIN_SEARCH_LIST2 = \ 43 "w\x16\x09marketing\x06google\x03com\x00" 44 45def bin2hex(byte_str, justification=20): 46 """ 47 Turn big hex strings into prettier strings of hex bytes. Group those hex 48 bytes into lines justification bytes long. 49 """ 50 chars = ["x" + (hex(ord(c))[2:].zfill(2)) for c in byte_str] 51 groups = [] 52 for i in xrange(0, len(chars), justification): 53 groups.append("".join(chars[i:i+justification])) 54 return "\n".join(groups) 55 56def test_packet_serialization(): 57 log_file = open(TEST_DATA_PATH_PREFIX + "dhcp_discovery.log", "rb") 58 binary_discovery_packet = log_file.read() 59 log_file.close() 60 discovery_packet = dhcp_packet.DhcpPacket(byte_str=binary_discovery_packet) 61 if not discovery_packet.is_valid: 62 return False 63 generated_string = discovery_packet.to_binary_string() 64 if generated_string is None: 65 print "Failed to generate string from packet object." 66 return False 67 if generated_string != binary_discovery_packet: 68 print "Packets didn't match: " 69 print "Generated: \n%s" % bin2hex(generated_string) 70 print "Expected: \n%s" % bin2hex(binary_discovery_packet) 71 return False 72 print "test_packet_serialization PASSED" 73 return True 74 75def test_classless_static_route_parsing(): 76 parsed_routes = dhcp_packet.ClasslessStaticRoutesOption.unpack( 77 TEST_CLASSLESS_STATIC_ROUTE_DATA) 78 if parsed_routes != TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED: 79 print ("Parsed binary domain list and got %s but expected %s" % 80 (repr(parsed_routes), 81 repr(TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED))) 82 return False 83 print "test_classless_static_route_parsing PASSED" 84 return True 85 86def test_classless_static_route_serialization(): 87 byte_string = dhcp_packet.ClasslessStaticRoutesOption.pack( 88 TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED) 89 if byte_string != TEST_CLASSLESS_STATIC_ROUTE_DATA: 90 # Turn the strings into printable hex strings on a single line. 91 pretty_actual = bin2hex(byte_string, 100) 92 pretty_expected = bin2hex(TEST_CLASSLESS_STATIC_ROUTE_DATA, 100) 93 print ("Expected to serialize %s to %s but instead got %s." % 94 (repr(TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED), pretty_expected, 95 pretty_actual)) 96 return False 97 print "test_classless_static_route_serialization PASSED" 98 return True 99 100def test_domain_search_list_parsing(): 101 parsed_domains = dhcp_packet.DomainListOption.unpack( 102 TEST_DOMAIN_SEARCH_LIST_COMPRESSED) 103 # Order matters too. 104 parsed_domains = tuple(parsed_domains) 105 if parsed_domains != TEST_DOMAIN_SEARCH_LIST_PARSED: 106 print ("Parsed binary domain list and got %s but expected %s" % 107 (parsed_domains, TEST_DOMAIN_SEARCH_LIST_EXPECTED)) 108 return False 109 print "test_domain_search_list_parsing PASSED" 110 return True 111 112def test_domain_search_list_serialization(): 113 byte_string = dhcp_packet.DomainListOption.pack( 114 TEST_DOMAIN_SEARCH_LIST_PARSED) 115 if byte_string != TEST_DOMAIN_SEARCH_LIST_EXPECTED: 116 # Turn the strings into printable hex strings on a single line. 117 pretty_actual = bin2hex(byte_string, 100) 118 pretty_expected = bin2hex(TEST_DOMAIN_SEARCH_LIST_EXPECTED, 100) 119 print ("Expected to serialize %s to %s but instead got %s." % 120 (TEST_DOMAIN_SEARCH_LIST_PARSED, pretty_expected, pretty_actual)) 121 return False 122 print "test_domain_search_list_serialization PASSED" 123 return True 124 125def test_broken_domain_search_list_parsing(): 126 byte_string = '\x00' * 240 + TEST_DOMAIN_SEARCH_LIST1 + TEST_DOMAIN_SEARCH_LIST2 + '\xff' 127 packet = dhcp_packet.DhcpPacket(byte_str=byte_string) 128 if len(packet._options) != 1: 129 print "Expected domain list of length 1" 130 return False 131 for k, v in packet._options.items(): 132 if tuple(v) != TEST_DOMAIN_SEARCH_LIST_PARSED: 133 print ("Expected binary domain list and got %s but expected %s" % 134 (tuple(v), TEST_DOMAIN_SEARCH_LIST_PARSED)) 135 return False 136 print "test_broken_domain_search_list_parsing PASSED" 137 return True 138 139def receive_packet(a_socket, timeout_seconds=1.0): 140 data = None 141 start_time = time.time() 142 while data is None and start_time + timeout_seconds > time.time(): 143 try: 144 data, _ = a_socket.recvfrom(1024) 145 except socket.timeout: 146 pass # We expect many timeouts. 147 if data is None: 148 print "Timed out before we received a response from the server." 149 return None 150 151 print "Client received a packet of length %d from the server." % len(data) 152 packet = dhcp_packet.DhcpPacket(byte_str=data) 153 if not packet.is_valid: 154 print "Received an invalid response from DHCP server." 155 return None 156 157 return packet 158 159def test_simple_server_exchange(server): 160 intended_ip = "127.0.0.42" 161 subnet_mask = "255.255.255.0" 162 server_ip = "127.0.0.1" 163 lease_time_seconds = 60 164 test_timeout = 3.0 165 mac_addr = "\x01\x02\x03\x04\x05\x06" 166 # Build up our packets and have them request some default option values, 167 # like the IP we're being assigned and the address of the server assigning 168 # it. 169 discovery_message = dhcp_packet.DhcpPacket.create_discovery_packet(mac_addr) 170 discovery_message.set_option( 171 dhcp_packet.OPTION_PARAMETER_REQUEST_LIST, 172 dhcp_packet.OPTION_VALUE_PARAMETER_REQUEST_LIST_DEFAULT) 173 request_message = dhcp_packet.DhcpPacket.create_request_packet( 174 discovery_message.transaction_id, 175 mac_addr) 176 request_message.set_option( 177 dhcp_packet.OPTION_PARAMETER_REQUEST_LIST, 178 dhcp_packet.OPTION_VALUE_PARAMETER_REQUEST_LIST_DEFAULT) 179 # This is the pool of settings the DHCP server will seem to draw from to 180 # answer queries from the client. This information is written into packets 181 # through the handling rules. 182 dhcp_server_config = { 183 dhcp_packet.OPTION_SERVER_ID : server_ip, 184 dhcp_packet.OPTION_SUBNET_MASK : subnet_mask, 185 dhcp_packet.OPTION_IP_LEASE_TIME : lease_time_seconds, 186 dhcp_packet.OPTION_REQUESTED_IP : intended_ip, 187 } 188 # Build up the handling rules for the server and start the test. 189 rules = [] 190 rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery( 191 intended_ip, 192 server_ip, 193 dhcp_server_config, {})) 194 rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToRequest( 195 intended_ip, 196 server_ip, 197 dhcp_server_config, {})) 198 rules[-1].is_final_handler = True 199 server.start_test(rules, test_timeout) 200 # Because we don't want to require root permissions to run these tests, 201 # listen on the loopback device, don't broadcast, and don't use reserved 202 # ports (like the actual DHCP ports). Use 8068/8067 instead. 203 client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 204 client_socket.bind(("127.0.0.1", 8068)) 205 client_socket.settimeout(0.1) 206 client_socket.sendto(discovery_message.to_binary_string(), 207 (server_ip, 8067)) 208 209 offer_packet = receive_packet(client_socket) 210 if offer_packet is None: 211 return False 212 213 if (offer_packet.message_type != dhcp_packet.MESSAGE_TYPE_OFFER): 214 print "Type of DHCP response is not offer." 215 return False 216 217 if offer_packet.get_field(dhcp_packet.FIELD_YOUR_IP) != intended_ip: 218 print "Server didn't offer the IP we expected." 219 return False 220 221 print "Offer looks good to the client, sending request." 222 # In real tests, dhcpcd formats all the DISCOVERY and REQUEST messages. In 223 # our unit test, we have to do this ourselves. 224 request_message.set_option( 225 dhcp_packet.OPTION_SERVER_ID, 226 offer_packet.get_option(dhcp_packet.OPTION_SERVER_ID)) 227 request_message.set_option( 228 dhcp_packet.OPTION_SUBNET_MASK, 229 offer_packet.get_option(dhcp_packet.OPTION_SUBNET_MASK)) 230 request_message.set_option( 231 dhcp_packet.OPTION_IP_LEASE_TIME, 232 offer_packet.get_option(dhcp_packet.OPTION_IP_LEASE_TIME)) 233 request_message.set_option( 234 dhcp_packet.OPTION_REQUESTED_IP, 235 offer_packet.get_option(dhcp_packet.OPTION_REQUESTED_IP)) 236 # Send the REQUEST message. 237 client_socket.sendto(request_message.to_binary_string(), 238 (server_ip, 8067)) 239 ack_packet = receive_packet(client_socket) 240 if ack_packet is None: 241 return False 242 243 if (ack_packet.message_type != dhcp_packet.MESSAGE_TYPE_ACK): 244 print "Type of DHCP response is not acknowledgement." 245 return False 246 247 if ack_packet.get_field(dhcp_packet.FIELD_YOUR_IP) != intended_ip: 248 print "Server didn't give us the IP we expected." 249 return False 250 251 print "Waiting for the server to finish." 252 server.wait_for_test_to_finish() 253 print "Server agrees that the test is over." 254 if not server.last_test_passed: 255 print "Server is unhappy with the test result." 256 return False 257 258 print "test_simple_server_exchange PASSED." 259 return True 260 261def test_server_dialogue(): 262 server = dhcp_test_server.DhcpTestServer(ingress_address="127.0.0.1", 263 ingress_port=8067, 264 broadcast_address="127.0.0.1", 265 broadcast_port=8068) 266 server.start() 267 ret = False 268 if server.is_healthy: 269 ret = test_simple_server_exchange(server) 270 else: 271 print "Server isn't healthy, aborting." 272 print "Sending server stop() signal." 273 server.stop() 274 print "Stop signal sent." 275 return ret 276 277def run_tests(): 278 logger = logging.getLogger("dhcp") 279 logger.setLevel(logging.DEBUG) 280 stream_handler = logging.StreamHandler() 281 stream_handler.setLevel(logging.DEBUG) 282 logger.addHandler(stream_handler) 283 retval = test_packet_serialization() 284 retval &= test_classless_static_route_parsing() 285 retval &= test_classless_static_route_serialization() 286 retval &= test_domain_search_list_parsing() 287 retval &= test_domain_search_list_serialization() 288 retval &= test_broken_domain_search_list_parsing() 289 retval &= test_server_dialogue() 290 if retval: 291 print "All tests PASSED." 292 return 0 293 else: 294 print "Some tests FAILED" 295 return -1 296 297if __name__ == "__main__": 298 sys.exit(run_tests()) 299