• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python2
2
3# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7import logging
8import socket
9import sys
10import time
11
12import common
13
14from autotest_lib.client.cros import dhcp_handling_rule
15from autotest_lib.client.cros import dhcp_packet
16from autotest_lib.client.cros import dhcp_test_server
17
18TEST_DATA_PATH_PREFIX = "client/cros/dhcp_test_data/"
19
20TEST_CLASSLESS_STATIC_ROUTE_DATA = \
21        "\x12\x0a\x09\xc0\xac\x1f\x9b\x0a" \
22        "\x00\xc0\xa8\x00\xfe"
23
24TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED = [
25        (18, "10.9.192.0", "172.31.155.10"),
26        (0, "0.0.0.0", "192.168.0.254")
27        ]
28
29TEST_DOMAIN_SEARCH_LIST_COMPRESSED = \
30        "\x03eng\x06google\x03com\x00\x09marketing\xC0\x04"
31
32TEST_DOMAIN_SEARCH_LIST_PARSED = ("eng.google.com", "marketing.google.com")
33
34# At this time, we don't support the compression allowed in the RFC.
35# This is correct and sufficient for our purposes.
36TEST_DOMAIN_SEARCH_LIST_EXPECTED = \
37        "\x03eng\x06google\x03com\x00\x09marketing\x06google\x03com\x00"
38
39TEST_DOMAIN_SEARCH_LIST1 = \
40        "w\x10\x03eng\x06google\x03com\x00"
41
42TEST_DOMAIN_SEARCH_LIST2 = \
43        "w\x16\x09marketing\x06google\x03com\x00"
44
45def bin2hex(byte_str, justification=20):
46    """
47    Turn big hex strings into prettier strings of hex bytes.  Group those hex
48    bytes into lines justification bytes long.
49    """
50    chars = ["x" + (hex(ord(c))[2:].zfill(2)) for c in byte_str]
51    groups = []
52    for i in xrange(0, len(chars), justification):
53        groups.append("".join(chars[i:i+justification]))
54    return "\n".join(groups)
55
56def test_packet_serialization():
57    log_file = open(TEST_DATA_PATH_PREFIX + "dhcp_discovery.log", "rb")
58    binary_discovery_packet = log_file.read()
59    log_file.close()
60    discovery_packet = dhcp_packet.DhcpPacket(byte_str=binary_discovery_packet)
61    if not discovery_packet.is_valid:
62        return False
63    generated_string = discovery_packet.to_binary_string()
64    if generated_string is None:
65        print "Failed to generate string from packet object."
66        return False
67    if generated_string != binary_discovery_packet:
68        print "Packets didn't match: "
69        print "Generated: \n%s" % bin2hex(generated_string)
70        print "Expected: \n%s" % bin2hex(binary_discovery_packet)
71        return False
72    print "test_packet_serialization PASSED"
73    return True
74
75def test_classless_static_route_parsing():
76    parsed_routes = dhcp_packet.ClasslessStaticRoutesOption.unpack(
77            TEST_CLASSLESS_STATIC_ROUTE_DATA)
78    if parsed_routes != TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED:
79        print ("Parsed binary domain list and got %s but expected %s" %
80               (repr(parsed_routes),
81                repr(TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED)))
82        return False
83    print "test_classless_static_route_parsing PASSED"
84    return True
85
86def test_classless_static_route_serialization():
87    byte_string = dhcp_packet.ClasslessStaticRoutesOption.pack(
88            TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED)
89    if byte_string != TEST_CLASSLESS_STATIC_ROUTE_DATA:
90        # Turn the strings into printable hex strings on a single line.
91        pretty_actual = bin2hex(byte_string, 100)
92        pretty_expected = bin2hex(TEST_CLASSLESS_STATIC_ROUTE_DATA, 100)
93        print ("Expected to serialize %s to %s but instead got %s." %
94               (repr(TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED), pretty_expected,
95                     pretty_actual))
96        return False
97    print "test_classless_static_route_serialization PASSED"
98    return True
99
100def test_domain_search_list_parsing():
101    parsed_domains = dhcp_packet.DomainListOption.unpack(
102            TEST_DOMAIN_SEARCH_LIST_COMPRESSED)
103    # Order matters too.
104    parsed_domains = tuple(parsed_domains)
105    if parsed_domains != TEST_DOMAIN_SEARCH_LIST_PARSED:
106        print ("Parsed binary domain list and got %s but expected %s" %
107               (parsed_domains, TEST_DOMAIN_SEARCH_LIST_EXPECTED))
108        return False
109    print "test_domain_search_list_parsing PASSED"
110    return True
111
112def test_domain_search_list_serialization():
113    byte_string = dhcp_packet.DomainListOption.pack(
114            TEST_DOMAIN_SEARCH_LIST_PARSED)
115    if byte_string != TEST_DOMAIN_SEARCH_LIST_EXPECTED:
116        # Turn the strings into printable hex strings on a single line.
117        pretty_actual = bin2hex(byte_string, 100)
118        pretty_expected = bin2hex(TEST_DOMAIN_SEARCH_LIST_EXPECTED, 100)
119        print ("Expected to serialize %s to %s but instead got %s." %
120               (TEST_DOMAIN_SEARCH_LIST_PARSED, pretty_expected, pretty_actual))
121        return False
122    print "test_domain_search_list_serialization PASSED"
123    return True
124
125def test_broken_domain_search_list_parsing():
126    byte_string = '\x00' * 240 + TEST_DOMAIN_SEARCH_LIST1 + TEST_DOMAIN_SEARCH_LIST2 + '\xff'
127    packet = dhcp_packet.DhcpPacket(byte_str=byte_string)
128    if len(packet._options) != 1:
129        print "Expected domain list of length 1"
130        return False
131    for k, v in packet._options.items():
132        if tuple(v) != TEST_DOMAIN_SEARCH_LIST_PARSED:
133            print ("Expected binary domain list and got %s but expected %s" %
134                    (tuple(v), TEST_DOMAIN_SEARCH_LIST_PARSED))
135            return False
136    print "test_broken_domain_search_list_parsing PASSED"
137    return True
138
139def receive_packet(a_socket, timeout_seconds=1.0):
140    data = None
141    start_time = time.time()
142    while data is None and start_time + timeout_seconds > time.time():
143        try:
144            data, _ = a_socket.recvfrom(1024)
145        except socket.timeout:
146            pass # We expect many timeouts.
147    if data is None:
148        print "Timed out before we received a response from the server."
149        return None
150
151    print "Client received a packet of length %d from the server." % len(data)
152    packet = dhcp_packet.DhcpPacket(byte_str=data)
153    if not packet.is_valid:
154        print "Received an invalid response from DHCP server."
155        return None
156
157    return packet
158
159def test_simple_server_exchange(server):
160    intended_ip = "127.0.0.42"
161    subnet_mask = "255.255.255.0"
162    server_ip = "127.0.0.1"
163    lease_time_seconds = 60
164    test_timeout = 3.0
165    mac_addr = "\x01\x02\x03\x04\x05\x06"
166    # Build up our packets and have them request some default option values,
167    # like the IP we're being assigned and the address of the server assigning
168    # it.
169    discovery_message = dhcp_packet.DhcpPacket.create_discovery_packet(mac_addr)
170    discovery_message.set_option(
171            dhcp_packet.OPTION_PARAMETER_REQUEST_LIST,
172            dhcp_packet.OPTION_VALUE_PARAMETER_REQUEST_LIST_DEFAULT)
173    request_message = dhcp_packet.DhcpPacket.create_request_packet(
174            discovery_message.transaction_id,
175            mac_addr)
176    request_message.set_option(
177            dhcp_packet.OPTION_PARAMETER_REQUEST_LIST,
178            dhcp_packet.OPTION_VALUE_PARAMETER_REQUEST_LIST_DEFAULT)
179    # This is the pool of settings the DHCP server will seem to draw from to
180    # answer queries from the client.  This information is written into packets
181    # through the handling rules.
182    dhcp_server_config = {
183            dhcp_packet.OPTION_SERVER_ID : server_ip,
184            dhcp_packet.OPTION_SUBNET_MASK : subnet_mask,
185            dhcp_packet.OPTION_IP_LEASE_TIME : lease_time_seconds,
186            dhcp_packet.OPTION_REQUESTED_IP : intended_ip,
187            }
188    # Build up the handling rules for the server and start the test.
189    rules = []
190    rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery(
191            intended_ip,
192            server_ip,
193            dhcp_server_config, {}))
194    rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToRequest(
195            intended_ip,
196            server_ip,
197            dhcp_server_config, {}))
198    rules[-1].is_final_handler = True
199    server.start_test(rules, test_timeout)
200    # Because we don't want to require root permissions to run these tests,
201    # listen on the loopback device, don't broadcast, and don't use reserved
202    # ports (like the actual DHCP ports).  Use 8068/8067 instead.
203    client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
204    client_socket.bind(("127.0.0.1", 8068))
205    client_socket.settimeout(0.1)
206    client_socket.sendto(discovery_message.to_binary_string(),
207                         (server_ip, 8067))
208
209    offer_packet = receive_packet(client_socket)
210    if offer_packet is None:
211        return False
212
213    if (offer_packet.message_type != dhcp_packet.MESSAGE_TYPE_OFFER):
214        print "Type of DHCP response is not offer."
215        return False
216
217    if offer_packet.get_field(dhcp_packet.FIELD_YOUR_IP) != intended_ip:
218        print "Server didn't offer the IP we expected."
219        return False
220
221    print "Offer looks good to the client, sending request."
222    # In real tests, dhcpcd formats all the DISCOVERY and REQUEST messages.  In
223    # our unit test, we have to do this ourselves.
224    request_message.set_option(
225            dhcp_packet.OPTION_SERVER_ID,
226            offer_packet.get_option(dhcp_packet.OPTION_SERVER_ID))
227    request_message.set_option(
228            dhcp_packet.OPTION_SUBNET_MASK,
229            offer_packet.get_option(dhcp_packet.OPTION_SUBNET_MASK))
230    request_message.set_option(
231            dhcp_packet.OPTION_IP_LEASE_TIME,
232            offer_packet.get_option(dhcp_packet.OPTION_IP_LEASE_TIME))
233    request_message.set_option(
234            dhcp_packet.OPTION_REQUESTED_IP,
235            offer_packet.get_option(dhcp_packet.OPTION_REQUESTED_IP))
236    # Send the REQUEST message.
237    client_socket.sendto(request_message.to_binary_string(),
238                         (server_ip, 8067))
239    ack_packet = receive_packet(client_socket)
240    if ack_packet is None:
241        return False
242
243    if (ack_packet.message_type != dhcp_packet.MESSAGE_TYPE_ACK):
244        print "Type of DHCP response is not acknowledgement."
245        return False
246
247    if ack_packet.get_field(dhcp_packet.FIELD_YOUR_IP) != intended_ip:
248        print "Server didn't give us the IP we expected."
249        return False
250
251    print "Waiting for the server to finish."
252    server.wait_for_test_to_finish()
253    print "Server agrees that the test is over."
254    if not server.last_test_passed:
255        print "Server is unhappy with the test result."
256        return False
257
258    print "test_simple_server_exchange PASSED."
259    return True
260
261def test_server_dialogue():
262    server = dhcp_test_server.DhcpTestServer(ingress_address="127.0.0.1",
263                                             ingress_port=8067,
264                                             broadcast_address="127.0.0.1",
265                                             broadcast_port=8068)
266    server.start()
267    ret = False
268    if server.is_healthy:
269        ret = test_simple_server_exchange(server)
270    else:
271        print "Server isn't healthy, aborting."
272    print "Sending server stop() signal."
273    server.stop()
274    print "Stop signal sent."
275    return ret
276
277def run_tests():
278    logger = logging.getLogger("dhcp")
279    logger.setLevel(logging.DEBUG)
280    stream_handler = logging.StreamHandler()
281    stream_handler.setLevel(logging.DEBUG)
282    logger.addHandler(stream_handler)
283    retval = test_packet_serialization()
284    retval &= test_classless_static_route_parsing()
285    retval &= test_classless_static_route_serialization()
286    retval &= test_domain_search_list_parsing()
287    retval &= test_domain_search_list_serialization()
288    retval &= test_broken_domain_search_list_parsing()
289    retval &= test_server_dialogue()
290    if retval:
291        print "All tests PASSED."
292        return 0
293    else:
294        print "Some tests FAILED"
295        return -1
296
297if __name__ == "__main__":
298    sys.exit(run_tests())
299