1from acts import asserts 2from acts import base_test 3from acts import signals 4from acts.controllers import android_device 5from acts.test_decorators import test_tracker_info 6 7from scapy.all import * 8from threading import Event 9from threading import Thread 10import random 11import time 12import warnings 13 14 15CLIENT_PORT = 68 16SERVER_PORT = 67 17BROADCAST_MAC = 'ff:ff:ff:ff:ff:ff' 18INET4_ANY = '0.0.0.0' 19NETADDR_PREFIX = '192.168.42.' 20OTHER_NETADDR_PREFIX = '192.168.43.' 21NETADDR_BROADCAST = '255.255.255.255' 22SUBNET_BROADCAST = NETADDR_PREFIX + '255' 23USB_CHARGE_MODE = 'svc usb setFunctions' 24USB_TETHERING_MODE = 'svc usb setFunctions rndis' 25DEVICE_IP_ADDRESS = 'ip address' 26 27 28OFFER = 2 29REQUEST = 3 30ACK = 5 31NAK = 6 32 33 34class DhcpServerTest(base_test.BaseTestClass): 35 def setup_class(self): 36 self.dut = self.android_devices[0] 37 self.USB_TETHERED = False 38 self.next_hwaddr_index = 0 39 self.stop_arp = Event() 40 41 conf.checkIPaddr = 0 42 conf.checkIPsrc = 0 43 # Allow using non-67 server ports as long as client uses 68 44 bind_layers(UDP, BOOTP, dport=CLIENT_PORT) 45 46 iflist_before = get_if_list() 47 self._start_usb_tethering(self.dut) 48 self.iface = self._wait_for_new_iface(iflist_before) 49 self.real_hwaddr = get_if_raw_hwaddr(self.iface) 50 51 # Start a thread to answer to all ARP "who-has" 52 thread = Thread(target=self._sniff_arp, args=(self.stop_arp,)) 53 thread.start() 54 55 # Discover server IP and device hwaddr 56 hwaddr = self._next_hwaddr() 57 resp = self._get_response(self._make_discover(hwaddr)) 58 asserts.assert_false(None == resp, 59 "Device did not reply to first DHCP discover") 60 self.server_addr = getopt(resp, 'server_id') 61 self.dut_hwaddr = resp.getlayer(Ether).src 62 asserts.assert_false(None == self.server_addr, 63 "DHCP server did not specify server identifier") 64 # Ensure that we don't depend on assigned route/gateway on the host 65 conf.route.add(host=self.server_addr, dev=self.iface, gw="0.0.0.0") 66 67 def setup_test(self): 68 # Some versions of scapy do not close the receive file properly 69 warnings.filterwarnings("ignore", category=ResourceWarning) 70 71 bind_layers(UDP, BOOTP, dport=68) 72 self.hwaddr = self._next_hwaddr() 73 self.other_hwaddr = self._next_hwaddr() 74 self.cleanup_releases = [] 75 76 def teardown_test(self): 77 for packet in self.cleanup_releases: 78 self._send(packet) 79 80 def teardown_class(self): 81 self.stop_arp.set() 82 self._stop_usb_tethering(self.dut) 83 84 def on_fail(self, test_name, begin_time): 85 self.dut.take_bug_report(test_name, begin_time) 86 87 def _start_usb_tethering(self, dut): 88 """ Start USB tethering 89 90 Args: 91 1. dut - ad object 92 """ 93 self.log.info("Starting USB Tethering") 94 dut.stop_services() 95 dut.adb.shell(USB_TETHERING_MODE, ignore_status=True) 96 dut.adb.wait_for_device() 97 dut.start_services() 98 if 'rndis' not in dut.adb.shell(DEVICE_IP_ADDRESS): 99 raise signals.TestFailure('Unable to enable USB tethering.') 100 self.USB_TETHERED = True 101 102 def _stop_usb_tethering(self, dut): 103 """ Stop USB tethering 104 105 Args: 106 1. dut - ad object 107 """ 108 self.log.info("Stopping USB Tethering") 109 dut.stop_services() 110 dut.adb.shell(USB_CHARGE_MODE) 111 dut.adb.wait_for_device() 112 dut.start_services() 113 self.USB_TETHERED = False 114 115 def _wait_for_device(self, dut): 116 """ Wait for device to come back online 117 118 Args: 119 1. dut - ad object 120 """ 121 while dut.serial not in android_device.list_adb_devices(): 122 pass 123 dut.adb.wait_for_device() 124 125 def _wait_for_new_iface(self, old_ifaces): 126 old_set = set(old_ifaces) 127 # Try 10 times to find a new interface with a 1s sleep every time 128 # (equivalent to a 9s timeout) 129 for i in range(0, 10): 130 new_ifaces = set(get_if_list()) - old_set 131 asserts.assert_true(len(new_ifaces) < 2, 132 "Too many new interfaces after turning on tethering") 133 if len(new_ifaces) == 1: 134 return new_ifaces.pop() 135 time.sleep(1) 136 asserts.fail("Timeout waiting for tethering interface on host") 137 138 def _sniff_arp(self, stop_arp): 139 try: 140 sniff(iface=self.iface, filter='arp', prn=self._handle_arp, store=0, 141 stop_filter=lambda p: stop_arp.is_set()) 142 except: 143 # sniff may raise when tethering is disconnected. Ignore 144 # exceptions if stop was requested. 145 if not stop_arp.is_set(): 146 raise 147 148 def _handle_arp(self, packet): 149 # Reply to all arp "who-has": say we have everything 150 if packet[ARP].op == ARP.who_has: 151 reply = ARP(op=ARP.is_at, hwsrc=self.real_hwaddr, psrc=packet.pdst, 152 hwdst=BROADCAST_MAC, pdst=SUBNET_BROADCAST) 153 sendp(Ether(dst=BROADCAST_MAC, src=self.real_hwaddr) / reply, 154 iface=self.iface, verbose=False) 155 156 @test_tracker_info(uuid="a8712769-977a-4ee1-902f-90b3ba30b40c") 157 def test_config_assumptions(self): 158 resp = self._get_response(self._make_discover(self.hwaddr)) 159 asserts.assert_false(None == resp, "Device did not reply to discover") 160 asserts.assert_true(get_yiaddr(resp).startswith(NETADDR_PREFIX), 161 "Server does not use expected prefix") 162 163 @test_tracker_info(uuid="e3761689-7d64-46b1-97ce-15f315eaf568") 164 def test_discover_broadcastbit(self): 165 resp = self._get_response( 166 self._make_discover(self.hwaddr, bcastbit=True)) 167 self._assert_offer(resp) 168 self._assert_broadcast(resp) 169 170 @test_tracker_info(uuid="30a7ea7c-c20f-4c46-aaf2-96f19d8f8191") 171 def test_discover_bootpfields(self): 172 discover = self._make_discover(self.hwaddr) 173 resp = self._get_response(discover) 174 self._assert_offer(resp) 175 self._assert_unicast(resp) 176 bootp = assert_bootp_response(resp, discover) 177 asserts.assert_equal(INET4_ANY, bootp.ciaddr) 178 asserts.assert_equal(self.server_addr, bootp.siaddr) 179 asserts.assert_equal(INET4_ANY, bootp.giaddr) 180 asserts.assert_equal(self.hwaddr, get_chaddr(bootp)) 181 182 @test_tracker_info(uuid="593f4051-516d-44fa-8834-7d485362f182") 183 def test_discover_relayed_broadcastbit(self): 184 giaddr = NETADDR_PREFIX + '123' 185 resp = self._get_response( 186 self._make_discover(self.hwaddr, giaddr=giaddr, bcastbit=True)) 187 self._assert_offer(resp) 188 self._assert_relayed(resp, giaddr) 189 self._assert_broadcastbit(resp) 190 191 def _run_discover_paramrequestlist(self, params, unwanted_params): 192 params_opt = make_paramrequestlist_opt(params) 193 resp = self._get_response( 194 self._make_discover(self.hwaddr, options=[params_opt])) 195 196 self._assert_offer(resp) 197 # List of requested params in response order 198 resp_opts = get_opt_labels(resp) 199 resp_requested_opts = [opt for opt in resp_opts if opt in params] 200 # All above params should be supported, order should be conserved 201 asserts.assert_equal(params, resp_requested_opts) 202 asserts.assert_equal(0, len(set(resp_opts) & set(unwanted_params))) 203 return resp 204 205 @test_tracker_info(uuid="00a8a3f6-f143-47ff-a79b-482c607fb5b8") 206 def test_discover_paramrequestlist(self): 207 resp = self._run_discover_paramrequestlist( 208 ['subnet_mask', 'broadcast_address', 'router', 'name_server'], 209 unwanted_params=[]) 210 for opt in ['broadcast_address', 'router', 'name_server']: 211 asserts.assert_true(getopt(resp, opt).startswith(NETADDR_PREFIX), 212 opt + ' does not start with ' + NETADDR_PREFIX) 213 214 subnet_mask = getopt(resp, 'subnet_mask') 215 asserts.assert_true(subnet_mask.startswith('255.255.'), 216 'Unexpected subnet mask for /16+: ' + subnet_mask) 217 218 @test_tracker_info(uuid="d1aad4a3-9eab-4900-aa6a-5b82a4a64f46") 219 def test_discover_paramrequestlist_rev(self): 220 # RFC2132 #9.8: "The DHCP server is not required to return the options 221 # in the requested order, but MUST try to insert the requested options 222 # in the order requested" 223 asserts.skip('legacy behavior not compliant: fixed order used') 224 self._run_discover_paramrequestlist( 225 ['name_server', 'router', 'broadcast_address', 'subnet_mask'], 226 unwanted_params=[]) 227 228 @test_tracker_info(uuid="e3ae6335-8cc7-4bf1-bb58-67646b727f2b") 229 def test_discover_paramrequestlist_unwanted(self): 230 asserts.skip('legacy behavior always sends all parameters') 231 self._run_discover_paramrequestlist(['router', 'name_server'], 232 unwanted_params=['broadcast_address', 'subnet_mask']) 233 234 def _assert_renews(self, request, addr, exp_time, resp_type=ACK): 235 # Sleep to test lease time renewal 236 time.sleep(3) 237 resp = self._get_response(request) 238 self._assert_type(resp, resp_type) 239 asserts.assert_equal(addr, get_yiaddr(resp)) 240 remaining_lease = getopt(resp, 'lease_time') 241 # Lease renewed: waited for 3s, lease time not decreased by more than 2 242 asserts.assert_true(remaining_lease >= exp_time - 2, 243 'Lease not renewed') 244 # Lease times should be consistent across offers/renewals 245 asserts.assert_true(remaining_lease <= exp_time + 2, 246 'Lease time inconsistent') 247 return resp 248 249 @test_tracker_info(uuid="d6b598b7-f443-4b5a-ba80-4af5d211cade") 250 def test_discover_assigned_ownaddress(self): 251 addr, siaddr, resp = self._request_address(self.hwaddr) 252 253 lease_time = getopt(resp, 'lease_time') 254 server_id = getopt(resp, 'server_id') 255 asserts.assert_true(lease_time >= 60, "Lease time is too short") 256 asserts.assert_false(addr == INET4_ANY, "Assigned address is empty") 257 # Wait to test lease expiration time change 258 time.sleep(3) 259 260 # New discover, same address 261 resp = self._assert_renews(self._make_discover(self.hwaddr), 262 addr, lease_time, resp_type=OFFER) 263 self._assert_unicast(resp, get_yiaddr(resp)) 264 self._assert_broadcastbit(resp, isset=False) 265 266 @test_tracker_info(uuid="cbb07d77-912b-4269-bbbc-adba99779587") 267 def test_discover_assigned_otherhost(self): 268 addr, siaddr, _ = self._request_address(self.hwaddr) 269 270 # New discover, same address, different client 271 resp = self._get_response(self._make_discover(self.other_hwaddr, 272 [('requested_addr', addr)])) 273 274 self._assert_offer(resp) 275 asserts.assert_false(get_yiaddr(resp) == addr, 276 "Already assigned address offered") 277 self._assert_unicast(resp, get_yiaddr(resp)) 278 self._assert_broadcastbit(resp, isset=False) 279 280 @test_tracker_info(uuid="3d2b3d2f-eb5f-498f-b887-3b4638cebf14") 281 def test_discover_requestaddress(self): 282 addr = NETADDR_PREFIX + '200' 283 resp = self._get_response(self._make_discover(self.hwaddr, 284 [('requested_addr', addr)])) 285 self._assert_offer(resp) 286 asserts.assert_equal(get_yiaddr(resp), addr) 287 288 # Lease not committed: can request again 289 resp = self._get_response(self._make_discover(self.other_hwaddr, 290 [('requested_addr', addr)])) 291 self._assert_offer(resp) 292 asserts.assert_equal(get_yiaddr(resp), addr) 293 294 @test_tracker_info(uuid="5ffd9d25-304e-434b-bedb-56ccf27dcebd") 295 def test_discover_requestaddress_wrongsubnet(self): 296 addr = OTHER_NETADDR_PREFIX + '200' 297 resp = self._get_response( 298 self._make_discover(self.hwaddr, [('requested_addr', addr)])) 299 self._assert_offer(resp) 300 self._assert_unicast(resp) 301 asserts.assert_false(get_yiaddr(resp) == addr, 302 'Server offered invalid address') 303 304 @test_tracker_info(uuid="f7d6a92f-9386-4b65-b6c1-d0a3f11213bf") 305 def test_discover_giaddr_outside_subnet(self): 306 giaddr = OTHER_NETADDR_PREFIX + '201' 307 resp = self._get_response( 308 self._make_discover(self.hwaddr, giaddr=giaddr)) 309 asserts.assert_equal(resp, None) 310 311 @test_tracker_info(uuid="1348c79a-9203-4bb8-b33b-af80bacd17b1") 312 def test_discover_srcaddr_outside_subnet(self): 313 srcaddr = OTHER_NETADDR_PREFIX + '200' 314 resp = self._get_response( 315 self._make_discover(self.hwaddr, ip_src=srcaddr)) 316 self._assert_offer(resp) 317 asserts.assert_false(srcaddr == get_yiaddr(resp), 318 'Server offered invalid address') 319 320 @test_tracker_info(uuid="a03bb783-8665-4c66-9c0c-1bb02ddca07e") 321 def test_discover_requestaddress_giaddr_outside_subnet(self): 322 addr = NETADDR_PREFIX + '200' 323 giaddr = OTHER_NETADDR_PREFIX + '201' 324 req = self._make_discover(self.hwaddr, [('requested_addr', addr)], 325 ip_src=giaddr, giaddr=giaddr) 326 resp = self._get_response(req) 327 asserts.assert_equal(resp, None) 328 329 @test_tracker_info(uuid="725956af-71e2-45d8-b8b3-402d21bfc7db") 330 def test_discover_knownaddress_giaddr_outside_subnet(self): 331 addr, siaddr, _ = self._request_address(self.hwaddr) 332 333 # New discover, same client, through relay in invalid subnet 334 giaddr = OTHER_NETADDR_PREFIX + '200' 335 resp = self._get_response( 336 self._make_discover(self.hwaddr, giaddr=giaddr)) 337 asserts.assert_equal(resp, None) 338 339 @test_tracker_info(uuid="2ee9d5b1-c15d-40c4-98e9-63202d1f1557") 340 def test_discover_knownaddress_giaddr_valid_subnet(self): 341 addr, siaddr, _ = self._request_address(self.hwaddr) 342 343 # New discover, same client, through relay in valid subnet 344 giaddr = NETADDR_PREFIX + '200' 345 resp = self._get_response( 346 self._make_discover(self.hwaddr, giaddr=giaddr)) 347 self._assert_offer(resp) 348 self._assert_unicast(resp, giaddr) 349 self._assert_broadcastbit(resp, isset=False) 350 351 @test_tracker_info(uuid="f43105a5-633a-417a-8a07-39bc36c493e7") 352 def test_request_unicast(self): 353 addr, siaddr, resp = self._request_address(self.hwaddr, bcast=False) 354 self._assert_unicast(resp, addr) 355 356 @test_tracker_info(uuid="09f3c1c4-1202-4f85-a965-4d86aee069f3") 357 def test_request_bootpfields(self): 358 req_addr = NETADDR_PREFIX + '200' 359 req = self._make_request(self.hwaddr, req_addr, self.server_addr) 360 resp = self._get_response(req) 361 self._assert_ack(resp) 362 bootp = assert_bootp_response(resp, req) 363 asserts.assert_equal(INET4_ANY, bootp.ciaddr) 364 asserts.assert_equal(self.server_addr, bootp.siaddr) 365 asserts.assert_equal(INET4_ANY, bootp.giaddr) 366 asserts.assert_equal(self.hwaddr, get_chaddr(bootp)) 367 368 @test_tracker_info(uuid="ec00d268-80cb-4be5-9771-2292cc7d2e18") 369 def test_request_selecting_inuse(self): 370 addr, siaddr, _ = self._request_address(self.hwaddr) 371 new_req = self._make_request(self.other_hwaddr, addr, siaddr) 372 resp = self._get_response(new_req) 373 self._assert_nak(resp) 374 self._assert_broadcast(resp) 375 bootp = assert_bootp_response(resp, new_req) 376 asserts.assert_equal(INET4_ANY, bootp.ciaddr) 377 asserts.assert_equal(INET4_ANY, bootp.yiaddr) 378 asserts.assert_equal(INET4_ANY, bootp.siaddr) 379 asserts.assert_equal(INET4_ANY, bootp.giaddr) 380 asserts.assert_equal(self.other_hwaddr, get_chaddr(bootp)) 381 asserts.assert_equal( 382 ['message-type', 'server_id', 56, 'end'], # 56 is "message" opt 383 get_opt_labels(bootp)) 384 asserts.assert_equal(self.server_addr, getopt(bootp, 'server_id')) 385 386 @test_tracker_info(uuid="0643c179-3542-4297-9b06-8d86ff785e9c") 387 def test_request_selecting_wrongsiaddr(self): 388 addr = NETADDR_PREFIX + '200' 389 wrong_siaddr = NETADDR_PREFIX + '201' 390 asserts.assert_false(wrong_siaddr == self.server_addr, 391 'Test assumption not met: server addr is ' + wrong_siaddr) 392 resp = self._get_response( 393 self._make_request(self.hwaddr, addr, siaddr=wrong_siaddr)) 394 asserts.assert_true(resp == None, 395 'Received response for request with incorrect siaddr') 396 397 @test_tracker_info(uuid="676beab2-4af8-4bf0-a4ad-c7626ae5987f") 398 def test_request_selecting_giaddr_outside_subnet(self): 399 addr = NETADDR_PREFIX + '200' 400 giaddr = OTHER_NETADDR_PREFIX + '201' 401 resp = self._get_response( 402 self._make_request(self.hwaddr, addr, siaddr=self.server_addr, 403 giaddr=giaddr)) 404 asserts.assert_equal(resp, None) 405 406 @test_tracker_info(uuid="fe17df0c-2f41-416f-bb76-d75b74b63c0f") 407 def test_request_selecting_hostnameupdate(self): 408 addr = NETADDR_PREFIX + '123' 409 hostname1 = b'testhostname1' 410 hostname2 = b'testhostname2' 411 req = self._make_request(self.hwaddr, None, None, 412 options=[ 413 ('requested_addr', addr), 414 ('server_id', self.server_addr), 415 ('hostname', hostname1)]) 416 resp = self._get_response(req) 417 self._assert_ack(resp) 418 self._assert_unicast(resp, addr) 419 asserts.assert_equal(hostname1, getopt(req, 'hostname')) 420 421 # Re-request with different hostname 422 setopt(req, 'hostname', hostname2) 423 resp = self._get_response(req) 424 self._assert_ack(resp) 425 self._assert_unicast(resp, addr) 426 asserts.assert_equal(hostname2, getopt(req, 'hostname')) 427 428 def _run_initreboot(self, bcastbit): 429 addr, siaddr, resp = self._request_address(self.hwaddr) 430 exp = getopt(resp, 'lease_time') 431 432 # init-reboot: siaddr is None 433 return self._assert_renews(self._make_request( 434 self.hwaddr, addr, siaddr=None, bcastbit=bcastbit), addr, exp) 435 436 @test_tracker_info(uuid="263c91b9-cfe9-4f21-985d-b7046df80528") 437 def test_request_initreboot(self): 438 resp = self._run_initreboot(bcastbit=False) 439 self._assert_unicast(resp) 440 self._assert_broadcastbit(resp, isset=False) 441 442 @test_tracker_info(uuid="f05dd60f-03dd-4e2b-8e58-80f4d752ad51") 443 def test_request_initreboot_broadcastbit(self): 444 resp = self._run_initreboot(bcastbit=True) 445 self._assert_broadcast(resp) 446 447 @test_tracker_info(uuid="5563c616-2136-47f6-9151-4e28cbfe797c") 448 def test_request_initreboot_nolease(self): 449 # RFC2131 #4.3.2 450 asserts.skip("legacy behavior not compliant") 451 addr = NETADDR_PREFIX + '123' 452 resp = self._get_response(self._make_request(self.hwaddr, addr, None)) 453 asserts.assert_equal(resp, None) 454 455 @test_tracker_info(uuid="da5c5537-cb38-4a2e-828f-44bc97976fe5") 456 def test_request_initreboot_incorrectlease(self): 457 otheraddr = NETADDR_PREFIX + '123' 458 addr, siaddr, _ = self._request_address(self.hwaddr) 459 asserts.assert_false(addr == otheraddr, 460 "Test assumption not met: server assigned " + otheraddr) 461 462 resp = self._get_response( 463 self._make_request(self.hwaddr, otheraddr, siaddr=None)) 464 self._assert_nak(resp) 465 self._assert_broadcast(resp) 466 467 @test_tracker_info(uuid="ce42ba57-07be-427b-9cbd-5535c62b0120") 468 def test_request_initreboot_wrongnet(self): 469 resp = self._get_response(self._make_request(self.hwaddr, 470 OTHER_NETADDR_PREFIX + '1', siaddr=None)) 471 self._assert_nak(resp) 472 self._assert_broadcast(resp) 473 474 def _run_rebinding(self, bcastbit, giaddr=INET4_ANY): 475 addr, siaddr, resp = self._request_address(self.hwaddr) 476 exp = getopt(resp, 'lease_time') 477 478 # Rebinding: no siaddr or reqaddr 479 resp = self._assert_renews( 480 self._make_request(self.hwaddr, reqaddr=None, siaddr=None, 481 ciaddr=addr, giaddr=giaddr, ip_src=addr, 482 ip_dst=NETADDR_BROADCAST, bcastbit=bcastbit), 483 addr, exp) 484 return resp, addr 485 486 @test_tracker_info(uuid="68bfcb25-5873-41ad-ad0a-bf22781534ca") 487 def test_request_rebinding(self): 488 resp, addr = self._run_rebinding(bcastbit=False) 489 self._assert_unicast(resp, addr) 490 self._assert_broadcastbit(resp, isset=False) 491 492 @test_tracker_info(uuid="4c591536-8062-40ec-ae12-1ebe7dcad8e2") 493 def test_request_rebinding_relayed(self): 494 giaddr = NETADDR_PREFIX + '123' 495 resp, _ = self._run_rebinding(bcastbit=False, giaddr=giaddr) 496 self._assert_relayed(resp, giaddr) 497 self._assert_broadcastbit(resp, isset=False) 498 499 @test_tracker_info(uuid="cee2668b-bd79-47d7-b358-8f9387d715b1") 500 def test_request_rebinding_inuse(self): 501 addr, siaddr, _ = self._request_address(self.hwaddr) 502 503 resp = self._get_response(self._make_request( 504 self.other_hwaddr, reqaddr=None, siaddr=None, ciaddr=addr)) 505 self._assert_nak(resp) 506 self._assert_broadcast(resp) 507 508 @test_tracker_info(uuid="d95d69b5-ab9a-42f5-8dd0-b9b6a6d960cc") 509 def test_request_rebinding_wrongaddr(self): 510 otheraddr = NETADDR_PREFIX + '123' 511 addr, siaddr, _ = self._request_address(self.hwaddr) 512 asserts.assert_false(addr == otheraddr, 513 "Test assumption not met: server assigned " + otheraddr) 514 515 resp = self._get_response(self._make_request( 516 self.hwaddr, reqaddr=None, siaddr=siaddr, ciaddr=otheraddr)) 517 self._assert_nak(resp) 518 self._assert_broadcast(resp) 519 520 @test_tracker_info(uuid="421a86b3-8779-4910-8050-7806536efabb") 521 def test_request_rebinding_wrongaddr_relayed(self): 522 otheraddr = NETADDR_PREFIX + '123' 523 relayaddr = NETADDR_PREFIX + '124' 524 addr, siaddr, _ = self._request_address(self.hwaddr) 525 asserts.assert_false(addr == otheraddr, 526 "Test assumption not met: server assigned " + otheraddr) 527 asserts.assert_false(addr == relayaddr, 528 "Test assumption not met: server assigned " + relayaddr) 529 530 req = self._make_request(self.hwaddr, reqaddr=None, siaddr=None, 531 ciaddr=otheraddr, giaddr=relayaddr) 532 533 resp = self._get_response(req) 534 self._assert_nak(resp) 535 self._assert_relayed(resp, relayaddr) 536 self._assert_broadcastbit(resp) 537 538 @test_tracker_info(uuid="6ff1fab4-009a-4758-9153-0d9db63423da") 539 def test_release(self): 540 addr, siaddr, _ = self._request_address(self.hwaddr) 541 # Re-requesting fails 542 resp = self._get_response( 543 self._make_request(self.other_hwaddr, addr, siaddr)) 544 self._assert_nak(resp) 545 self._assert_broadcast(resp) 546 547 # Succeeds after release 548 self._send(self._make_release(self.hwaddr, addr, siaddr)) 549 resp = self._get_response( 550 self._make_request(self.other_hwaddr, addr, siaddr)) 551 self._assert_ack(resp) 552 553 @test_tracker_info(uuid="abb1a53e-6b6c-468f-88b9-ace9ca4d6593") 554 def test_release_noserverid(self): 555 addr, siaddr, _ = self._request_address(self.hwaddr) 556 557 # Release without server_id opt is ignored 558 release = self._make_release(self.hwaddr, addr, siaddr) 559 removeopt(release, 'server_id') 560 self._send(release) 561 562 # Not released: request fails 563 resp = self._get_response( 564 self._make_request(self.other_hwaddr, addr, siaddr)) 565 self._assert_nak(resp) 566 self._assert_broadcast(resp) 567 568 @test_tracker_info(uuid="8415b69e-ae61-4474-8495-d783ba6818d1") 569 def test_release_wrongserverid(self): 570 addr, siaddr, _ = self._request_address(self.hwaddr) 571 572 # Release with wrong server id 573 release = self._make_release(self.hwaddr, addr, siaddr) 574 setopt(release, 'server_id', addr) 575 self._send(release) 576 577 # Not released: request fails 578 resp = self._get_response( 579 self._make_request(self.other_hwaddr, addr, siaddr)) 580 self._assert_nak(resp) 581 self._assert_broadcast(resp) 582 583 @test_tracker_info(uuid="0858f678-3db2-4c12-a21b-6e16c5d7e7ce") 584 def test_unicast_l2l3(self): 585 reqAddr = NETADDR_PREFIX + '124' 586 resp = self._get_response(self._make_request( 587 self.hwaddr, reqAddr, siaddr=None)) 588 self._assert_unicast(resp) 589 str_hwaddr = format_hwaddr(self.hwaddr) 590 asserts.assert_equal(str_hwaddr, resp.getlayer(Ether).dst) 591 asserts.assert_equal(reqAddr, resp.getlayer(IP).dst) 592 asserts.assert_equal(CLIENT_PORT, resp.getlayer(UDP).dport) 593 594 @test_tracker_info(uuid="bf05efe9-ee5b-46ba-9b3c-5a4441c13798") 595 def test_macos_10_13_3_discover(self): 596 params_opt = make_paramrequestlist_opt([ 597 'subnet_mask', 598 121, # Classless Static Route 599 'router', 600 'name_server', 601 'domain', 602 119, # Domain Search 603 252, # Private/Proxy autodiscovery 604 95, # LDAP 605 'NetBIOS_server', 606 46, # NetBIOS over TCP/IP Node Type 607 ]) 608 req = self._make_discover(self.hwaddr, 609 options=[ 610 params_opt, 611 ('max_dhcp_size', 1500), 612 # HW type Ethernet (0x01) 613 ('client_id', b'\x01' + self.hwaddr), 614 ('lease_time', 7776000), 615 ('hostname', b'test12-macbookpro'), 616 ], opts_padding=bytes(6)) 617 req.getlayer(BOOTP).secs = 2 618 resp = self._get_response(req) 619 self._assert_standard_offer(resp) 620 621 def _make_macos_10_13_3_paramrequestlist(self): 622 return make_paramrequestlist_opt([ 623 'subnet_mask', 624 121, # Classless Static Route 625 'router', 626 'name_server', 627 'domain', 628 119, # Domain Search 629 252, # Private/Proxy autodiscovery 630 95, # LDAP 631 44, # NetBIOS over TCP/IP Name Server 632 46, # NetBIOS over TCP/IP Node Type 633 ]) 634 635 @test_tracker_info(uuid="bf05efe9-ee5b-46ba-9b3c-5a4441c13798") 636 def test_macos_10_13_3_discover(self): 637 req = self._make_discover(self.hwaddr, 638 options=[ 639 self._make_macos_10_13_3_paramrequestlist(), 640 ('max_dhcp_size', 1500), 641 # HW type Ethernet (0x01) 642 ('client_id', b'\x01' + self.hwaddr), 643 ('lease_time', 7776000), 644 ('hostname', b'test12-macbookpro'), 645 ], opts_padding=bytes(6)) 646 req.getlayer(BOOTP).secs = 2 647 resp = self._get_response(req) 648 self._assert_offer(resp) 649 self._assert_standard_offer_or_ack(resp) 650 651 @test_tracker_info(uuid="7acc796b-c4f1-46cc-8ffb-0a0efb05ae86") 652 def test_macos_10_13_3_request_selecting(self): 653 req = self._make_request(self.hwaddr, None, None, 654 options=[ 655 self._make_macos_10_13_3_paramrequestlist(), 656 ('max_dhcp_size', 1500), 657 # HW type Ethernet (0x01) 658 ('client_id', b'\x01' + self.hwaddr), 659 ('requested_addr', NETADDR_PREFIX + '109'), 660 ('server_id', self.server_addr), 661 ('hostname', b'test12-macbookpro'), 662 ]) 663 req.getlayer(BOOTP).secs = 5 664 resp = self._get_response(req) 665 self._assert_ack(resp) 666 self._assert_standard_offer_or_ack(resp) 667 668 # Note: macOS does not seem to do any rebinding (straight to discover) 669 @test_tracker_info(uuid="e8f0b60c-9ea3-4184-8426-151a395bff5b") 670 def test_macos_10_13_3_request_renewing(self): 671 req_ip = NETADDR_PREFIX + '109' 672 req = self._make_request(self.hwaddr, None, None, 673 ciaddr=req_ip, ip_src=req_ip, ip_dst=self.server_addr, options=[ 674 self._make_macos_10_13_3_paramrequestlist(), 675 ('max_dhcp_size', 1500), 676 # HW type Ethernet (0x01) 677 ('client_id', b'\x01' + self.hwaddr), 678 ('lease_time', 7776000), 679 ('hostname', b'test12-macbookpro'), 680 ], opts_padding=bytes(6)) 681 resp = self._get_response(req) 682 self._assert_ack(resp) 683 self._assert_standard_offer_or_ack(resp, renewing=True) 684 685 def _make_win10_paramrequestlist(self): 686 return make_paramrequestlist_opt([ 687 'subnet_mask', 688 'router', 689 'name_server', 690 'domain', 691 31, # Perform Router Discover 692 33, # Static Route 693 'vendor_specific', 694 44, # NetBIOS over TCP/IP Name Server 695 46, # NetBIOS over TCP/IP Node Type 696 47, # NetBIOS over TCP/IP Scope 697 121, # Classless Static Route 698 249, # Private/Classless Static Route (MS) 699 252, # Private/Proxy autodiscovery 700 ]) 701 702 @test_tracker_info(uuid="11b3db9c-4cd7-4088-99dc-881f25ce4e76") 703 def test_win10_discover(self): 704 req = self._make_discover(self.hwaddr, bcastbit=True, 705 options=[ 706 # HW type Ethernet (0x01) 707 ('client_id', b'\x01' + self.hwaddr), 708 ('hostname', b'test120-w'), 709 ('vendor_class_id', b'MSFT 5.0'), 710 self._make_win10_paramrequestlist(), 711 ], opts_padding=bytes(11)) 712 req.getlayer(BOOTP).secs = 2 713 resp = self._get_response(req) 714 self._assert_offer(resp) 715 self._assert_standard_offer_or_ack(resp, bcast=True) 716 717 @test_tracker_info(uuid="4fe04e7f-c643-4a19-b15c-cf417b2c9410") 718 def test_win10_request_selecting(self): 719 req = self._make_request(self.hwaddr, None, None, bcastbit=True, 720 options=[ 721 ('max_dhcp_size', 1500), 722 # HW type Ethernet (0x01) 723 ('client_id', b'\x01' + self.hwaddr), 724 ('requested_addr', NETADDR_PREFIX + '109'), 725 ('server_id', self.server_addr), 726 ('hostname', b'test120-w'), 727 # Client Fully Qualified Domain Name 728 (81, b'\x00\x00\x00test120-w.ad.tst.example.com'), 729 ('vendor_class_id', b'MSFT 5.0'), 730 self._make_win10_paramrequestlist(), 731 ]) 732 resp = self._get_response(req) 733 self._assert_ack(resp) 734 self._assert_standard_offer_or_ack(resp, bcast=True) 735 736 def _run_win10_request_renewing(self, bcast): 737 req_ip = NETADDR_PREFIX + '109' 738 req = self._make_request(self.hwaddr, None, None, bcastbit=bcast, 739 ciaddr=req_ip, ip_src=req_ip, 740 ip_dst=NETADDR_BROADCAST if bcast else self.server_addr, 741 options=[ 742 ('max_dhcp_size', 1500), 743 # HW type Ethernet (0x01) 744 ('client_id', b'\x01' + self.hwaddr), 745 ('hostname', b'test120-w'), 746 # Client Fully Qualified Domain Name 747 (81, b'\x00\x00\x00test120-w.ad.tst.example.com'), 748 ('vendor_class_id', b'MSFT 5.0'), 749 self._make_win10_paramrequestlist(), 750 ]) 751 resp = self._get_response(req) 752 self._assert_ack(resp) 753 self._assert_standard_offer_or_ack(resp, renewing=True, bcast=bcast) 754 755 @test_tracker_info(uuid="1b23c9c7-cc94-42d0-83a6-f1b2bc125fb9") 756 def test_win10_request_renewing(self): 757 self._run_win10_request_renewing(bcast=False) 758 759 @test_tracker_info(uuid="c846bd14-71fb-4492-a4d3-0aa5c2c35751") 760 def test_win10_request_rebinding(self): 761 self._run_win10_request_renewing(bcast=True) 762 763 def _make_debian_paramrequestlist(self): 764 return make_paramrequestlist_opt([ 765 'subnet_mask', 766 'broadcast_address', 767 'router', 768 'name_server', 769 119, # Domain Search 770 'hostname', 771 101, # TCode 772 'domain', # NetBIOS over TCP/IP Name Server 773 'vendor_specific', # NetBIOS over TCP/IP Node Type 774 121, # Classless Static Route 775 249, # Private/Classless Static Route (MS) 776 33, # Static Route 777 252, # Private/Proxy autodiscovery 778 'NTP_server', 779 ]) 780 781 @test_tracker_info(uuid="b0bb6ae7-07e6-4ecb-9a2f-db9c8146a3d5") 782 def test_debian_dhclient_4_3_5_discover(self): 783 req_ip = NETADDR_PREFIX + '109' 784 req = self._make_discover(self.hwaddr, 785 options=[ 786 ('requested_addr', req_ip), 787 ('hostname', b'test12'), 788 self._make_debian_paramrequestlist(), 789 ], opts_padding=bytes(26)) 790 resp = self._get_response(req) 791 self._assert_offer(resp) 792 # Don't test for hostname option: previous implementation would not 793 # set it in offer, which was not consistent with ack 794 self._assert_standard_offer_or_ack(resp, ignore_hostname=True) 795 asserts.assert_equal(req_ip, get_yiaddr(resp)) 796 797 @test_tracker_info(uuid="d70bc043-84cb-4735-9123-c46c6d1ce5ac") 798 def test_debian_dhclient_4_3_5_request_selecting(self): 799 req = self._make_request(self.hwaddr, None, None, 800 options=[ 801 ('server_id', self.server_addr), 802 ('requested_addr', NETADDR_PREFIX + '109'), 803 ('hostname', b'test12'), 804 self._make_debian_paramrequestlist(), 805 ], opts_padding=bytes(20)) 806 resp = self._get_response(req) 807 self._assert_ack(resp) 808 self._assert_standard_offer_or_ack(resp, with_hostname=True) 809 810 def _run_debian_renewing(self, bcast): 811 req_ip = NETADDR_PREFIX + '109' 812 req = self._make_request(self.hwaddr, None, None, 813 ciaddr=req_ip, ip_src=req_ip, 814 ip_dst=NETADDR_BROADCAST if bcast else self.server_addr, 815 options=[ 816 ('hostname', b'test12'), 817 self._make_debian_paramrequestlist(), 818 ], 819 opts_padding=bytes(32)) 820 resp = self._get_response(req) 821 self._assert_ack(resp) 822 self._assert_standard_offer_or_ack(resp, renewing=True, 823 with_hostname=True) 824 825 @test_tracker_info(uuid="5e1e817d-9972-46ca-8d44-1e120bf1bafc") 826 def test_debian_dhclient_4_3_5_request_renewing(self): 827 self._run_debian_renewing(bcast=False) 828 829 @test_tracker_info(uuid="b179a36d-910e-4006-a79a-11cc561b69db") 830 def test_debian_dhclient_4_3_5_request_rebinding(self): 831 self._run_debian_renewing(bcast=True) 832 833 def _assert_standard_offer_or_ack(self, resp, renewing=False, bcast=False, 834 ignore_hostname=False, with_hostname=False): 835 # Responses to renew/rebind are always unicast to ciaddr even with 836 # broadcast flag set (RFC does not define this behavior, but this is 837 # more efficient and matches previous behavior) 838 if bcast and not renewing: 839 self._assert_broadcast(resp) 840 self._assert_broadcastbit(resp, isset=True) 841 else: 842 # Previous implementation would set the broadcast flag but send a 843 # unicast reply if (bcast and renewing). This was not consistent and 844 # new implementation consistently clears the flag. Not testing for 845 # broadcast flag value to maintain compatibility. 846 self._assert_unicast(resp) 847 848 bootp_resp = resp.getlayer(BOOTP) 849 asserts.assert_equal(0, bootp_resp.hops) 850 if renewing: 851 asserts.assert_true(bootp_resp.ciaddr.startswith(NETADDR_PREFIX), 852 'ciaddr does not start with expected prefix') 853 else: 854 asserts.assert_equal(INET4_ANY, bootp_resp.ciaddr) 855 asserts.assert_true(bootp_resp.yiaddr.startswith(NETADDR_PREFIX), 856 'yiaddr does not start with expected prefix') 857 asserts.assert_true(bootp_resp.siaddr.startswith(NETADDR_PREFIX), 858 'siaddr does not start with expected prefix') 859 asserts.assert_equal(INET4_ANY, bootp_resp.giaddr) 860 861 opt_labels = get_opt_labels(bootp_resp) 862 # FQDN option 81 is not supported in new behavior 863 opt_labels = [opt for opt in opt_labels if opt != 81] 864 865 # Expect exactly these options in this order 866 expected_opts = [ 867 'message-type', 'server_id', 'lease_time', 'renewal_time', 868 'rebinding_time', 'subnet_mask', 'broadcast_address', 'router', 869 'name_server'] 870 if ignore_hostname: 871 opt_labels = [opt for opt in opt_labels if opt != 'hostname'] 872 elif with_hostname: 873 expected_opts.append('hostname') 874 expected_opts.extend(['vendor_specific', 'end']) 875 asserts.assert_equal(expected_opts, opt_labels) 876 877 def _request_address(self, hwaddr, bcast=True): 878 resp = self._get_response(self._make_discover(hwaddr)) 879 self._assert_offer(resp) 880 addr = get_yiaddr(resp) 881 siaddr = getopt(resp, 'server_id') 882 resp = self._get_response(self._make_request(hwaddr, addr, siaddr, 883 ip_dst=(INET4_ANY if bcast else siaddr))) 884 self._assert_ack(resp) 885 return addr, siaddr, resp 886 887 def _get_response(self, packet): 888 resp = srp1(packet, iface=self.iface, timeout=10, verbose=False) 889 bootp_resp = (resp or None) and resp.getlayer(BOOTP) 890 if bootp_resp != None and get_mess_type(bootp_resp) == ACK: 891 # Note down corresponding release for this request 892 release = self._make_release(bootp_resp.chaddr, bootp_resp.yiaddr, 893 getopt(bootp_resp, 'server_id')) 894 self.cleanup_releases.append(release) 895 return resp 896 897 def _send(self, packet): 898 sendp(packet, iface=self.iface, verbose=False) 899 900 def _assert_type(self, packet, tp): 901 asserts.assert_false(None == packet, "No packet") 902 asserts.assert_equal(tp, get_mess_type(packet)) 903 904 def _assert_ack(self, packet): 905 self._assert_type(packet, ACK) 906 907 def _assert_nak(self, packet): 908 self._assert_type(packet, NAK) 909 910 def _assert_offer(self, packet): 911 self._assert_type(packet, OFFER) 912 913 def _assert_broadcast(self, packet): 914 asserts.assert_false(None == packet, "No packet") 915 asserts.assert_equal(packet.getlayer(Ether).dst, BROADCAST_MAC) 916 asserts.assert_equal(packet.getlayer(IP).dst, NETADDR_BROADCAST) 917 self._assert_broadcastbit(packet) 918 919 def _assert_broadcastbit(self, packet, isset=True): 920 mask = 0x8000 921 flag = packet.getlayer(BOOTP).flags 922 asserts.assert_equal(flag & mask, mask if isset else 0) 923 924 def _assert_unicast(self, packet, ipAddr=None): 925 asserts.assert_false(None == packet, "No packet") 926 asserts.assert_false(packet.getlayer(Ether).dst == BROADCAST_MAC, 927 "Layer 2 packet destination address was broadcast") 928 if ipAddr: 929 asserts.assert_equal(packet.getlayer(IP).dst, ipAddr) 930 931 def _assert_relayed(self, packet, giaddr): 932 self._assert_unicast(packet, giaddr) 933 asserts.assert_equal(giaddr, packet.getlayer(BOOTP).giaddr, 934 'Relayed response has invalid giaddr field') 935 936 def _next_hwaddr(self): 937 addr = make_hwaddr(self.next_hwaddr_index) 938 self.next_hwaddr_index = self.next_hwaddr_index + 1 939 return addr 940 941 def _make_dhcp(self, src_hwaddr, options, ciaddr=INET4_ANY, 942 ip_src=INET4_ANY, ip_dst=NETADDR_BROADCAST, giaddr=INET4_ANY, 943 bcastbit=False): 944 broadcast = (ip_dst == NETADDR_BROADCAST) 945 ethernet = Ether(dst=(BROADCAST_MAC if broadcast else self.dut_hwaddr)) 946 ip = IP(src=ip_src, dst=ip_dst) 947 udp = UDP(sport=68, dport=SERVER_PORT) 948 bootp = BOOTP(chaddr=src_hwaddr, ciaddr=ciaddr, giaddr=giaddr, 949 flags=(0x8000 if bcastbit else 0), xid=random.randrange(0, 2**32)) 950 dhcp = DHCP(options=options) 951 return ethernet / ip / udp / bootp / dhcp 952 953 def _make_discover(self, src_hwaddr, options = [], giaddr=INET4_ANY, 954 bcastbit=False, opts_padding=None, ip_src=INET4_ANY): 955 opts = [('message-type','discover')] 956 opts.extend(options) 957 opts.append('end') 958 if (opts_padding): 959 opts.append(opts_padding) 960 return self._make_dhcp(src_hwaddr, options=opts, giaddr=giaddr, 961 ip_dst=NETADDR_BROADCAST, bcastbit=bcastbit, ip_src=ip_src) 962 963 def _make_request(self, src_hwaddr, reqaddr, siaddr, ciaddr=INET4_ANY, 964 ip_dst=None, ip_src=None, giaddr=INET4_ANY, bcastbit=False, 965 options=[], opts_padding=None): 966 if not ip_dst: 967 ip_dst = siaddr or INET4_ANY 968 969 if not ip_src and ip_dst == INET4_ANY: 970 ip_src = INET4_ANY 971 elif not ip_src: 972 ip_src = (giaddr if not isempty(giaddr) 973 else ciaddr if not isempty(ciaddr) 974 else reqaddr) 975 # Kernel will not receive unicast UDP packets with empty ip_src 976 asserts.assert_false(ip_dst != INET4_ANY and isempty(ip_src), 977 "Unicast ip_src cannot be zero") 978 opts = [('message-type', 'request')] 979 if options: 980 opts.extend(options) 981 else: 982 if siaddr: 983 opts.append(('server_id', siaddr)) 984 if reqaddr: 985 opts.append(('requested_addr', reqaddr)) 986 opts.append('end') 987 if opts_padding: 988 opts.append(opts_padding) 989 return self._make_dhcp(src_hwaddr, options=opts, ciaddr=ciaddr, 990 ip_src=ip_src, ip_dst=ip_dst, giaddr=giaddr, bcastbit=bcastbit) 991 992 def _make_release(self, src_hwaddr, addr, server_id): 993 opts = [('message-type', 'release'), ('server_id', server_id), 'end'] 994 return self._make_dhcp(src_hwaddr, opts, ciaddr=addr, ip_src=addr, 995 ip_dst=server_id) 996 997def assert_bootp_response(resp, req): 998 bootp = resp.getlayer(BOOTP) 999 asserts.assert_equal(2, bootp.op, 'Invalid BOOTP op') 1000 asserts.assert_equal(1, bootp.htype, 'Invalid BOOTP htype') 1001 asserts.assert_equal(6, bootp.hlen, 'Invalid BOOTP hlen') 1002 asserts.assert_equal(0, bootp.hops, 'Invalid BOOTP hops') 1003 asserts.assert_equal(req.getlayer(BOOTP).xid, bootp.xid, 'Invalid XID') 1004 return bootp 1005 1006 1007def make_paramrequestlist_opt(params): 1008 param_indexes = [DHCPRevOptions[opt][0] if isinstance(opt, str) else opt 1009 for opt in params] 1010 return tuple(['param_req_list'] + [ 1011 opt.to_bytes(1, byteorder='big') if isinstance(opt, int) else opt 1012 for opt in param_indexes]) 1013 1014 1015def isempty(addr): 1016 return not addr or addr == INET4_ANY 1017 1018 1019def setopt(packet, optname, val): 1020 dhcp = packet.getlayer(DHCP) 1021 if optname in get_opt_labels(dhcp): 1022 dhcp.options = [(optname, val) if opt[0] == optname else opt 1023 for opt in dhcp.options] 1024 else: 1025 # Add before the last option (last option is "end") 1026 dhcp.options.insert(len(dhcp.options) - 1, (optname, val)) 1027 1028 1029def getopt(packet, key): 1030 opts = [opt[1] for opt in packet.getlayer(DHCP).options if opt[0] == key] 1031 return opts[0] if opts else None 1032 1033 1034def removeopt(packet, key): 1035 dhcp = packet.getlayer(DHCP) 1036 dhcp.options = [opt for opt in dhcp.options if opt[0] != key] 1037 1038 1039def get_opt_labels(packet): 1040 dhcp_resp = packet.getlayer(DHCP) 1041 # end option is a single string, not a tuple. 1042 return [opt if isinstance(opt, str) else opt[0] 1043 for opt in dhcp_resp.options if opt != 'pad'] 1044 1045 1046def get_yiaddr(packet): 1047 return packet.getlayer(BOOTP).yiaddr 1048 1049 1050def get_chaddr(packet): 1051 # We use Ethernet addresses. Ignore address padding 1052 return packet.getlayer(BOOTP).chaddr[:6] 1053 1054 1055def get_mess_type(packet): 1056 return getopt(packet, 'message-type') 1057 1058 1059def make_hwaddr(index): 1060 if index > 0xffff: 1061 raise ValueError("Address index out of range") 1062 return b'\x44\x85\x00\x00' + bytes([index >> 8, index & 0xff]) 1063 1064 1065def format_hwaddr(addr): 1066 return ':'.join(['%02x' % c for c in addr]) 1067