1from acts import asserts 2from acts import base_test 3from acts.controllers import android_device 4from acts.test_decorators import test_tracker_info 5 6from scapy.all import * 7from threading import Event 8from threading import Thread 9import random 10import time 11import warnings 12 13 14CLIENT_PORT = 68 15SERVER_PORT = 67 16BROADCAST_MAC = 'ff:ff:ff:ff:ff:ff' 17INET4_ANY = '0.0.0.0' 18NETADDR_PREFIX = '192.168.42.' 19OTHER_NETADDR_PREFIX = '192.168.43.' 20NETADDR_BROADCAST = '255.255.255.255' 21SUBNET_BROADCAST = NETADDR_PREFIX + '255' 22 23 24OFFER = 2 25REQUEST = 3 26ACK = 5 27NAK = 6 28 29 30pmc_base_cmd = ( 31 "am broadcast -a com.android.pmc.action.AUTOPOWER --es PowerAction ") 32start_pmc_cmd = ( 33 "am start -S -n com.android.pmc/com.android.pmc.PMCMainActivity") 34pmc_start_usb_tethering_cmd = "%sStartUSBTethering" % pmc_base_cmd 35pmc_stop_usb_tethering_cmd = "%sStopUSBTethering" % pmc_base_cmd 36 37 38class DhcpServerTest(base_test.BaseTestClass): 39 def setup_class(self): 40 self.dut = self.android_devices[0] 41 self.USB_TETHERED = False 42 self.next_hwaddr_index = 0 43 self.stop_arp = Event() 44 45 conf.checkIPaddr = 0 46 conf.checkIPsrc = 0 47 # Allow using non-67 server ports as long as client uses 68 48 bind_layers(UDP, BOOTP, dport=CLIENT_PORT) 49 50 self.dut.adb.shell(start_pmc_cmd) 51 self.dut.adb.shell("setprop log.tag.PMC VERBOSE") 52 iflist_before = get_if_list() 53 self._start_usb_tethering(self.dut) 54 self.iface = self._wait_for_new_iface(iflist_before) 55 self.real_hwaddr = get_if_raw_hwaddr(self.iface) 56 57 # Start a thread to answer to all ARP "who-has" 58 thread = Thread(target=self._sniff_arp, args=(self.stop_arp,)) 59 thread.start() 60 61 # Discover server IP and device hwaddr 62 hwaddr = self._next_hwaddr() 63 resp = self._get_response(self._make_discover(hwaddr)) 64 asserts.assert_false(None == resp, 65 "Device did not reply to first DHCP discover") 66 self.server_addr = getopt(resp, 'server_id') 67 self.dut_hwaddr = resp.getlayer(Ether).src 68 asserts.assert_false(None == self.server_addr, 69 "DHCP server did not specify server identifier") 70 # Ensure that we don't depend on assigned route/gateway on the host 71 conf.route.add(host=self.server_addr, dev=self.iface, gw="0.0.0.0") 72 73 def setup_test(self): 74 # Some versions of scapy do not close the receive file properly 75 warnings.filterwarnings("ignore", category=ResourceWarning) 76 77 bind_layers(UDP, BOOTP, dport=68) 78 self.hwaddr = self._next_hwaddr() 79 self.other_hwaddr = self._next_hwaddr() 80 self.cleanup_releases = [] 81 82 def teardown_test(self): 83 for packet in self.cleanup_releases: 84 self._send(packet) 85 86 def teardown_class(self): 87 self.stop_arp.set() 88 self._stop_usb_tethering(self.dut) 89 90 def on_fail(self, test_name, begin_time): 91 self.dut.take_bug_report(test_name, begin_time) 92 93 def _start_usb_tethering(self, dut): 94 """ Start USB tethering 95 96 Args: 97 1. dut - ad object 98 """ 99 self.log.info("Starting USB Tethering") 100 dut.stop_services() 101 dut.adb.shell(pmc_start_usb_tethering_cmd) 102 self._wait_for_device(self.dut) 103 self.USB_TETHERED = True 104 105 def _stop_usb_tethering(self, dut): 106 """ Stop USB tethering 107 108 Args: 109 1. dut - ad object 110 """ 111 self.log.info("Stopping USB Tethering") 112 dut.adb.shell(pmc_stop_usb_tethering_cmd) 113 self._wait_for_device(self.dut) 114 dut.start_services() 115 self.USB_TETHERED = False 116 117 def _wait_for_device(self, dut): 118 """ Wait for device to come back online 119 120 Args: 121 1. dut - ad object 122 """ 123 while dut.serial not in android_device.list_adb_devices(): 124 pass 125 dut.adb.wait_for_device() 126 127 def _wait_for_new_iface(self, old_ifaces): 128 old_set = set(old_ifaces) 129 # Try 10 times to find a new interface with a 1s sleep every time 130 # (equivalent to a 9s timeout) 131 for i in range(0, 10): 132 new_ifaces = set(get_if_list()) - old_set 133 asserts.assert_true(len(new_ifaces) < 2, 134 "Too many new interfaces after turning on tethering") 135 if len(new_ifaces) == 1: 136 return new_ifaces.pop() 137 time.sleep(1) 138 asserts.fail("Timeout waiting for tethering interface on host") 139 140 def _sniff_arp(self, stop_arp): 141 try: 142 sniff(iface=self.iface, filter='arp', prn=self._handle_arp, store=0, 143 stop_filter=lambda p: stop_arp.is_set()) 144 except: 145 # sniff may raise when tethering is disconnected. Ignore 146 # exceptions if stop was requested. 147 if not stop_arp.is_set(): 148 raise 149 150 def _handle_arp(self, packet): 151 # Reply to all arp "who-has": say we have everything 152 if packet[ARP].op == ARP.who_has: 153 reply = ARP(op=ARP.is_at, hwsrc=self.real_hwaddr, psrc=packet.pdst, 154 hwdst=BROADCAST_MAC, pdst=SUBNET_BROADCAST) 155 sendp(Ether(dst=BROADCAST_MAC, src=self.real_hwaddr) / reply, 156 iface=self.iface, verbose=False) 157 158 @test_tracker_info(uuid="a8712769-977a-4ee1-902f-90b3ba30b40c") 159 def test_config_assumptions(self): 160 resp = self._get_response(self._make_discover(self.hwaddr)) 161 asserts.assert_false(None == resp, "Device did not reply to discover") 162 asserts.assert_true(get_yiaddr(resp).startswith(NETADDR_PREFIX), 163 "Server does not use expected prefix") 164 165 @test_tracker_info(uuid="e3761689-7d64-46b1-97ce-15f315eaf568") 166 def test_discover_broadcastbit(self): 167 resp = self._get_response( 168 self._make_discover(self.hwaddr, bcastbit=True)) 169 self._assert_offer(resp) 170 self._assert_broadcast(resp) 171 172 @test_tracker_info(uuid="30a7ea7c-c20f-4c46-aaf2-96f19d8f8191") 173 def test_discover_bootpfields(self): 174 discover = self._make_discover(self.hwaddr) 175 resp = self._get_response(discover) 176 self._assert_offer(resp) 177 self._assert_unicast(resp) 178 bootp = assert_bootp_response(resp, discover) 179 asserts.assert_equal(INET4_ANY, bootp.ciaddr) 180 asserts.assert_equal(self.server_addr, bootp.siaddr) 181 asserts.assert_equal(INET4_ANY, bootp.giaddr) 182 asserts.assert_equal(self.hwaddr, get_chaddr(bootp)) 183 184 @test_tracker_info(uuid="593f4051-516d-44fa-8834-7d485362f182") 185 def test_discover_relayed_broadcastbit(self): 186 giaddr = NETADDR_PREFIX + '123' 187 resp = self._get_response( 188 self._make_discover(self.hwaddr, giaddr=giaddr, bcastbit=True)) 189 self._assert_offer(resp) 190 self._assert_relayed(resp, giaddr) 191 self._assert_broadcastbit(resp) 192 193 def _run_discover_paramrequestlist(self, params, unwanted_params): 194 params_opt = make_paramrequestlist_opt(params) 195 resp = self._get_response( 196 self._make_discover(self.hwaddr, options=[params_opt])) 197 198 self._assert_offer(resp) 199 # List of requested params in response order 200 resp_opts = get_opt_labels(resp) 201 resp_requested_opts = [opt for opt in resp_opts if opt in params] 202 # All above params should be supported, order should be conserved 203 asserts.assert_equal(params, resp_requested_opts) 204 asserts.assert_equal(0, len(set(resp_opts) & set(unwanted_params))) 205 return resp 206 207 @test_tracker_info(uuid="00a8a3f6-f143-47ff-a79b-482c607fb5b8") 208 def test_discover_paramrequestlist(self): 209 resp = self._run_discover_paramrequestlist( 210 ['subnet_mask', 'broadcast_address', 'router', 'name_server'], 211 unwanted_params=[]) 212 for opt in ['broadcast_address', 'router', 'name_server']: 213 asserts.assert_true(getopt(resp, opt).startswith(NETADDR_PREFIX), 214 opt + ' does not start with ' + NETADDR_PREFIX) 215 216 subnet_mask = getopt(resp, 'subnet_mask') 217 asserts.assert_true(subnet_mask.startswith('255.255.'), 218 'Unexpected subnet mask for /16+: ' + subnet_mask) 219 220 @test_tracker_info(uuid="d1aad4a3-9eab-4900-aa6a-5b82a4a64f46") 221 def test_discover_paramrequestlist_rev(self): 222 # RFC2132 #9.8: "The DHCP server is not required to return the options 223 # in the requested order, but MUST try to insert the requested options 224 # in the order requested" 225 asserts.skip('legacy behavior not compliant: fixed order used') 226 self._run_discover_paramrequestlist( 227 ['name_server', 'router', 'broadcast_address', 'subnet_mask'], 228 unwanted_params=[]) 229 230 @test_tracker_info(uuid="e3ae6335-8cc7-4bf1-bb58-67646b727f2b") 231 def test_discover_paramrequestlist_unwanted(self): 232 asserts.skip('legacy behavior always sends all parameters') 233 self._run_discover_paramrequestlist(['router', 'name_server'], 234 unwanted_params=['broadcast_address', 'subnet_mask']) 235 236 def _assert_renews(self, request, addr, exp_time, resp_type=ACK): 237 # Sleep to test lease time renewal 238 time.sleep(3) 239 resp = self._get_response(request) 240 self._assert_type(resp, resp_type) 241 asserts.assert_equal(addr, get_yiaddr(resp)) 242 remaining_lease = getopt(resp, 'lease_time') 243 # Lease renewed: waited for 3s, lease time not decreased by more than 2 244 asserts.assert_true(remaining_lease >= exp_time - 2, 245 'Lease not renewed') 246 # Lease times should be consistent across offers/renewals 247 asserts.assert_true(remaining_lease <= exp_time + 2, 248 'Lease time inconsistent') 249 return resp 250 251 @test_tracker_info(uuid="d6b598b7-f443-4b5a-ba80-4af5d211cade") 252 def test_discover_assigned_ownaddress(self): 253 addr, siaddr, resp = self._request_address(self.hwaddr) 254 255 lease_time = getopt(resp, 'lease_time') 256 server_id = getopt(resp, 'server_id') 257 asserts.assert_true(lease_time >= 60, "Lease time is too short") 258 asserts.assert_false(addr == INET4_ANY, "Assigned address is empty") 259 # Wait to test lease expiration time change 260 time.sleep(3) 261 262 # New discover, same address 263 resp = self._assert_renews(self._make_discover(self.hwaddr), 264 addr, lease_time, resp_type=OFFER) 265 self._assert_unicast(resp, get_yiaddr(resp)) 266 self._assert_broadcastbit(resp, isset=False) 267 268 @test_tracker_info(uuid="cbb07d77-912b-4269-bbbc-adba99779587") 269 def test_discover_assigned_otherhost(self): 270 addr, siaddr, _ = self._request_address(self.hwaddr) 271 272 # New discover, same address, different client 273 resp = self._get_response(self._make_discover(self.other_hwaddr, 274 [('requested_addr', addr)])) 275 276 self._assert_offer(resp) 277 asserts.assert_false(get_yiaddr(resp) == addr, 278 "Already assigned address offered") 279 self._assert_unicast(resp, get_yiaddr(resp)) 280 self._assert_broadcastbit(resp, isset=False) 281 282 @test_tracker_info(uuid="3d2b3d2f-eb5f-498f-b887-3b4638cebf14") 283 def test_discover_requestaddress(self): 284 addr = NETADDR_PREFIX + '200' 285 resp = self._get_response(self._make_discover(self.hwaddr, 286 [('requested_addr', addr)])) 287 self._assert_offer(resp) 288 asserts.assert_equal(get_yiaddr(resp), addr) 289 290 # Lease not committed: can request again 291 resp = self._get_response(self._make_discover(self.other_hwaddr, 292 [('requested_addr', addr)])) 293 self._assert_offer(resp) 294 asserts.assert_equal(get_yiaddr(resp), addr) 295 296 @test_tracker_info(uuid="5ffd9d25-304e-434b-bedb-56ccf27dcebd") 297 def test_discover_requestaddress_wrongsubnet(self): 298 addr = OTHER_NETADDR_PREFIX + '200' 299 resp = self._get_response( 300 self._make_discover(self.hwaddr, [('requested_addr', addr)])) 301 self._assert_offer(resp) 302 self._assert_unicast(resp) 303 asserts.assert_false(get_yiaddr(resp) == addr, 304 'Server offered invalid address') 305 306 @test_tracker_info(uuid="f7d6a92f-9386-4b65-b6c1-d0a3f11213bf") 307 def test_discover_giaddr_outside_subnet(self): 308 giaddr = OTHER_NETADDR_PREFIX + '201' 309 resp = self._get_response( 310 self._make_discover(self.hwaddr, giaddr=giaddr)) 311 asserts.assert_equal(resp, None) 312 313 @test_tracker_info(uuid="1348c79a-9203-4bb8-b33b-af80bacd17b1") 314 def test_discover_srcaddr_outside_subnet(self): 315 srcaddr = OTHER_NETADDR_PREFIX + '200' 316 resp = self._get_response( 317 self._make_discover(self.hwaddr, ip_src=srcaddr)) 318 self._assert_offer(resp) 319 asserts.assert_false(srcaddr == get_yiaddr(resp), 320 'Server offered invalid address') 321 322 @test_tracker_info(uuid="a03bb783-8665-4c66-9c0c-1bb02ddca07e") 323 def test_discover_requestaddress_giaddr_outside_subnet(self): 324 addr = NETADDR_PREFIX + '200' 325 giaddr = OTHER_NETADDR_PREFIX + '201' 326 req = self._make_discover(self.hwaddr, [('requested_addr', addr)], 327 ip_src=giaddr, giaddr=giaddr) 328 resp = self._get_response(req) 329 asserts.assert_equal(resp, None) 330 331 @test_tracker_info(uuid="725956af-71e2-45d8-b8b3-402d21bfc7db") 332 def test_discover_knownaddress_giaddr_outside_subnet(self): 333 addr, siaddr, _ = self._request_address(self.hwaddr) 334 335 # New discover, same client, through relay in invalid subnet 336 giaddr = OTHER_NETADDR_PREFIX + '200' 337 resp = self._get_response( 338 self._make_discover(self.hwaddr, giaddr=giaddr)) 339 asserts.assert_equal(resp, None) 340 341 @test_tracker_info(uuid="2ee9d5b1-c15d-40c4-98e9-63202d1f1557") 342 def test_discover_knownaddress_giaddr_valid_subnet(self): 343 addr, siaddr, _ = self._request_address(self.hwaddr) 344 345 # New discover, same client, through relay in valid subnet 346 giaddr = NETADDR_PREFIX + '200' 347 resp = self._get_response( 348 self._make_discover(self.hwaddr, giaddr=giaddr)) 349 self._assert_offer(resp) 350 self._assert_unicast(resp, giaddr) 351 self._assert_broadcastbit(resp, isset=False) 352 353 @test_tracker_info(uuid="f43105a5-633a-417a-8a07-39bc36c493e7") 354 def test_request_unicast(self): 355 addr, siaddr, resp = self._request_address(self.hwaddr, bcast=False) 356 self._assert_unicast(resp, addr) 357 358 @test_tracker_info(uuid="09f3c1c4-1202-4f85-a965-4d86aee069f3") 359 def test_request_bootpfields(self): 360 req_addr = NETADDR_PREFIX + '200' 361 req = self._make_request(self.hwaddr, req_addr, self.server_addr) 362 resp = self._get_response(req) 363 self._assert_ack(resp) 364 bootp = assert_bootp_response(resp, req) 365 asserts.assert_equal(INET4_ANY, bootp.ciaddr) 366 asserts.assert_equal(self.server_addr, bootp.siaddr) 367 asserts.assert_equal(INET4_ANY, bootp.giaddr) 368 asserts.assert_equal(self.hwaddr, get_chaddr(bootp)) 369 370 @test_tracker_info(uuid="ec00d268-80cb-4be5-9771-2292cc7d2e18") 371 def test_request_selecting_inuse(self): 372 addr, siaddr, _ = self._request_address(self.hwaddr) 373 new_req = self._make_request(self.other_hwaddr, addr, siaddr) 374 resp = self._get_response(new_req) 375 self._assert_nak(resp) 376 self._assert_broadcast(resp) 377 bootp = assert_bootp_response(resp, new_req) 378 asserts.assert_equal(INET4_ANY, bootp.ciaddr) 379 asserts.assert_equal(INET4_ANY, bootp.yiaddr) 380 asserts.assert_equal(INET4_ANY, bootp.siaddr) 381 asserts.assert_equal(INET4_ANY, bootp.giaddr) 382 asserts.assert_equal(self.other_hwaddr, get_chaddr(bootp)) 383 asserts.assert_equal( 384 ['message-type', 'server_id', 56, 'end'], # 56 is "message" opt 385 get_opt_labels(bootp)) 386 asserts.assert_equal(self.server_addr, getopt(bootp, 'server_id')) 387 388 @test_tracker_info(uuid="0643c179-3542-4297-9b06-8d86ff785e9c") 389 def test_request_selecting_wrongsiaddr(self): 390 addr = NETADDR_PREFIX + '200' 391 wrong_siaddr = NETADDR_PREFIX + '201' 392 asserts.assert_false(wrong_siaddr == self.server_addr, 393 'Test assumption not met: server addr is ' + wrong_siaddr) 394 resp = self._get_response( 395 self._make_request(self.hwaddr, addr, siaddr=wrong_siaddr)) 396 asserts.assert_true(resp == None, 397 'Received response for request with incorrect siaddr') 398 399 @test_tracker_info(uuid="676beab2-4af8-4bf0-a4ad-c7626ae5987f") 400 def test_request_selecting_giaddr_outside_subnet(self): 401 addr = NETADDR_PREFIX + '200' 402 giaddr = OTHER_NETADDR_PREFIX + '201' 403 resp = self._get_response( 404 self._make_request(self.hwaddr, addr, siaddr=self.server_addr, 405 giaddr=giaddr)) 406 asserts.assert_equal(resp, None) 407 408 @test_tracker_info(uuid="fe17df0c-2f41-416f-bb76-d75b74b63c0f") 409 def test_request_selecting_hostnameupdate(self): 410 addr = NETADDR_PREFIX + '123' 411 hostname1 = b'testhostname1' 412 hostname2 = b'testhostname2' 413 req = self._make_request(self.hwaddr, None, None, 414 options=[ 415 ('requested_addr', addr), 416 ('server_id', self.server_addr), 417 ('hostname', hostname1)]) 418 resp = self._get_response(req) 419 self._assert_ack(resp) 420 self._assert_unicast(resp, addr) 421 asserts.assert_equal(hostname1, getopt(req, 'hostname')) 422 423 # Re-request with different hostname 424 setopt(req, 'hostname', hostname2) 425 resp = self._get_response(req) 426 self._assert_ack(resp) 427 self._assert_unicast(resp, addr) 428 asserts.assert_equal(hostname2, getopt(req, 'hostname')) 429 430 def _run_initreboot(self, bcastbit): 431 addr, siaddr, resp = self._request_address(self.hwaddr) 432 exp = getopt(resp, 'lease_time') 433 434 # init-reboot: siaddr is None 435 return self._assert_renews(self._make_request( 436 self.hwaddr, addr, siaddr=None, bcastbit=bcastbit), addr, exp) 437 438 @test_tracker_info(uuid="263c91b9-cfe9-4f21-985d-b7046df80528") 439 def test_request_initreboot(self): 440 resp = self._run_initreboot(bcastbit=False) 441 self._assert_unicast(resp) 442 self._assert_broadcastbit(resp, isset=False) 443 444 @test_tracker_info(uuid="f05dd60f-03dd-4e2b-8e58-80f4d752ad51") 445 def test_request_initreboot_broadcastbit(self): 446 resp = self._run_initreboot(bcastbit=True) 447 self._assert_broadcast(resp) 448 449 @test_tracker_info(uuid="5563c616-2136-47f6-9151-4e28cbfe797c") 450 def test_request_initreboot_nolease(self): 451 # RFC2131 #4.3.2 452 asserts.skip("legacy behavior not compliant") 453 addr = NETADDR_PREFIX + '123' 454 resp = self._get_response(self._make_request(self.hwaddr, addr, None)) 455 asserts.assert_equal(resp, None) 456 457 @test_tracker_info(uuid="da5c5537-cb38-4a2e-828f-44bc97976fe5") 458 def test_request_initreboot_incorrectlease(self): 459 otheraddr = NETADDR_PREFIX + '123' 460 addr, siaddr, _ = self._request_address(self.hwaddr) 461 asserts.assert_false(addr == otheraddr, 462 "Test assumption not met: server assigned " + otheraddr) 463 464 resp = self._get_response( 465 self._make_request(self.hwaddr, otheraddr, siaddr=None)) 466 self._assert_nak(resp) 467 self._assert_broadcast(resp) 468 469 @test_tracker_info(uuid="ce42ba57-07be-427b-9cbd-5535c62b0120") 470 def test_request_initreboot_wrongnet(self): 471 resp = self._get_response(self._make_request(self.hwaddr, 472 OTHER_NETADDR_PREFIX + '1', siaddr=None)) 473 self._assert_nak(resp) 474 self._assert_broadcast(resp) 475 476 def _run_rebinding(self, bcastbit, giaddr=INET4_ANY): 477 addr, siaddr, resp = self._request_address(self.hwaddr) 478 exp = getopt(resp, 'lease_time') 479 480 # Rebinding: no siaddr or reqaddr 481 resp = self._assert_renews( 482 self._make_request(self.hwaddr, reqaddr=None, siaddr=None, 483 ciaddr=addr, giaddr=giaddr, ip_src=addr, 484 ip_dst=NETADDR_BROADCAST, bcastbit=bcastbit), 485 addr, exp) 486 return resp, addr 487 488 @test_tracker_info(uuid="68bfcb25-5873-41ad-ad0a-bf22781534ca") 489 def test_request_rebinding(self): 490 resp, addr = self._run_rebinding(bcastbit=False) 491 self._assert_unicast(resp, addr) 492 self._assert_broadcastbit(resp, isset=False) 493 494 @test_tracker_info(uuid="4c591536-8062-40ec-ae12-1ebe7dcad8e2") 495 def test_request_rebinding_relayed(self): 496 giaddr = NETADDR_PREFIX + '123' 497 resp, _ = self._run_rebinding(bcastbit=False, giaddr=giaddr) 498 self._assert_relayed(resp, giaddr) 499 self._assert_broadcastbit(resp, isset=False) 500 501 @test_tracker_info(uuid="cee2668b-bd79-47d7-b358-8f9387d715b1") 502 def test_request_rebinding_inuse(self): 503 addr, siaddr, _ = self._request_address(self.hwaddr) 504 505 resp = self._get_response(self._make_request( 506 self.other_hwaddr, reqaddr=None, siaddr=None, ciaddr=addr)) 507 self._assert_nak(resp) 508 self._assert_broadcast(resp) 509 510 @test_tracker_info(uuid="d95d69b5-ab9a-42f5-8dd0-b9b6a6d960cc") 511 def test_request_rebinding_wrongaddr(self): 512 otheraddr = NETADDR_PREFIX + '123' 513 addr, siaddr, _ = self._request_address(self.hwaddr) 514 asserts.assert_false(addr == otheraddr, 515 "Test assumption not met: server assigned " + otheraddr) 516 517 resp = self._get_response(self._make_request( 518 self.hwaddr, reqaddr=None, siaddr=siaddr, ciaddr=otheraddr)) 519 self._assert_nak(resp) 520 self._assert_broadcast(resp) 521 522 @test_tracker_info(uuid="421a86b3-8779-4910-8050-7806536efabb") 523 def test_request_rebinding_wrongaddr_relayed(self): 524 otheraddr = NETADDR_PREFIX + '123' 525 relayaddr = NETADDR_PREFIX + '124' 526 addr, siaddr, _ = self._request_address(self.hwaddr) 527 asserts.assert_false(addr == otheraddr, 528 "Test assumption not met: server assigned " + otheraddr) 529 asserts.assert_false(addr == relayaddr, 530 "Test assumption not met: server assigned " + relayaddr) 531 532 req = self._make_request(self.hwaddr, reqaddr=None, siaddr=None, 533 ciaddr=otheraddr, giaddr=relayaddr) 534 535 resp = self._get_response(req) 536 self._assert_nak(resp) 537 self._assert_relayed(resp, relayaddr) 538 self._assert_broadcastbit(resp) 539 540 @test_tracker_info(uuid="6ff1fab4-009a-4758-9153-0d9db63423da") 541 def test_release(self): 542 addr, siaddr, _ = self._request_address(self.hwaddr) 543 # Re-requesting fails 544 resp = self._get_response( 545 self._make_request(self.other_hwaddr, addr, siaddr)) 546 self._assert_nak(resp) 547 self._assert_broadcast(resp) 548 549 # Succeeds after release 550 self._send(self._make_release(self.hwaddr, addr, siaddr)) 551 resp = self._get_response( 552 self._make_request(self.other_hwaddr, addr, siaddr)) 553 self._assert_ack(resp) 554 555 @test_tracker_info(uuid="abb1a53e-6b6c-468f-88b9-ace9ca4d6593") 556 def test_release_noserverid(self): 557 addr, siaddr, _ = self._request_address(self.hwaddr) 558 559 # Release without server_id opt is ignored 560 release = self._make_release(self.hwaddr, addr, siaddr) 561 removeopt(release, 'server_id') 562 self._send(release) 563 564 # Not released: request fails 565 resp = self._get_response( 566 self._make_request(self.other_hwaddr, addr, siaddr)) 567 self._assert_nak(resp) 568 self._assert_broadcast(resp) 569 570 @test_tracker_info(uuid="8415b69e-ae61-4474-8495-d783ba6818d1") 571 def test_release_wrongserverid(self): 572 addr, siaddr, _ = self._request_address(self.hwaddr) 573 574 # Release with wrong server id 575 release = self._make_release(self.hwaddr, addr, siaddr) 576 setopt(release, 'server_id', addr) 577 self._send(release) 578 579 # Not released: request fails 580 resp = self._get_response( 581 self._make_request(self.other_hwaddr, addr, siaddr)) 582 self._assert_nak(resp) 583 self._assert_broadcast(resp) 584 585 @test_tracker_info(uuid="0858f678-3db2-4c12-a21b-6e16c5d7e7ce") 586 def test_unicast_l2l3(self): 587 reqAddr = NETADDR_PREFIX + '124' 588 resp = self._get_response(self._make_request( 589 self.hwaddr, reqAddr, siaddr=None)) 590 self._assert_unicast(resp) 591 str_hwaddr = format_hwaddr(self.hwaddr) 592 asserts.assert_equal(str_hwaddr, resp.getlayer(Ether).dst) 593 asserts.assert_equal(reqAddr, resp.getlayer(IP).dst) 594 asserts.assert_equal(CLIENT_PORT, resp.getlayer(UDP).dport) 595 596 @test_tracker_info(uuid="bf05efe9-ee5b-46ba-9b3c-5a4441c13798") 597 def test_macos_10_13_3_discover(self): 598 params_opt = make_paramrequestlist_opt([ 599 'subnet_mask', 600 121, # Classless Static Route 601 'router', 602 'name_server', 603 'domain', 604 119, # Domain Search 605 252, # Private/Proxy autodiscovery 606 95, # LDAP 607 'NetBIOS_server', 608 46, # NetBIOS over TCP/IP Node Type 609 ]) 610 req = self._make_discover(self.hwaddr, 611 options=[ 612 params_opt, 613 ('max_dhcp_size', 1500), 614 # HW type Ethernet (0x01) 615 ('client_id', b'\x01' + self.hwaddr), 616 ('lease_time', 7776000), 617 ('hostname', b'test12-macbookpro'), 618 ], opts_padding=bytes(6)) 619 req.getlayer(BOOTP).secs = 2 620 resp = self._get_response(req) 621 self._assert_standard_offer(resp) 622 623 def _make_macos_10_13_3_paramrequestlist(self): 624 return make_paramrequestlist_opt([ 625 'subnet_mask', 626 121, # Classless Static Route 627 'router', 628 'name_server', 629 'domain', 630 119, # Domain Search 631 252, # Private/Proxy autodiscovery 632 95, # LDAP 633 44, # NetBIOS over TCP/IP Name Server 634 46, # NetBIOS over TCP/IP Node Type 635 ]) 636 637 @test_tracker_info(uuid="bf05efe9-ee5b-46ba-9b3c-5a4441c13798") 638 def test_macos_10_13_3_discover(self): 639 req = self._make_discover(self.hwaddr, 640 options=[ 641 self._make_macos_10_13_3_paramrequestlist(), 642 ('max_dhcp_size', 1500), 643 # HW type Ethernet (0x01) 644 ('client_id', b'\x01' + self.hwaddr), 645 ('lease_time', 7776000), 646 ('hostname', b'test12-macbookpro'), 647 ], opts_padding=bytes(6)) 648 req.getlayer(BOOTP).secs = 2 649 resp = self._get_response(req) 650 self._assert_offer(resp) 651 self._assert_standard_offer_or_ack(resp) 652 653 @test_tracker_info(uuid="7acc796b-c4f1-46cc-8ffb-0a0efb05ae86") 654 def test_macos_10_13_3_request_selecting(self): 655 req = self._make_request(self.hwaddr, None, None, 656 options=[ 657 self._make_macos_10_13_3_paramrequestlist(), 658 ('max_dhcp_size', 1500), 659 # HW type Ethernet (0x01) 660 ('client_id', b'\x01' + self.hwaddr), 661 ('requested_addr', NETADDR_PREFIX + '109'), 662 ('server_id', self.server_addr), 663 ('hostname', b'test12-macbookpro'), 664 ]) 665 req.getlayer(BOOTP).secs = 5 666 resp = self._get_response(req) 667 self._assert_ack(resp) 668 self._assert_standard_offer_or_ack(resp) 669 670 # Note: macOS does not seem to do any rebinding (straight to discover) 671 @test_tracker_info(uuid="e8f0b60c-9ea3-4184-8426-151a395bff5b") 672 def test_macos_10_13_3_request_renewing(self): 673 req_ip = NETADDR_PREFIX + '109' 674 req = self._make_request(self.hwaddr, None, None, 675 ciaddr=req_ip, ip_src=req_ip, ip_dst=self.server_addr, options=[ 676 self._make_macos_10_13_3_paramrequestlist(), 677 ('max_dhcp_size', 1500), 678 # HW type Ethernet (0x01) 679 ('client_id', b'\x01' + self.hwaddr), 680 ('lease_time', 7776000), 681 ('hostname', b'test12-macbookpro'), 682 ], opts_padding=bytes(6)) 683 resp = self._get_response(req) 684 self._assert_ack(resp) 685 self._assert_standard_offer_or_ack(resp, renewing=True) 686 687 def _make_win10_paramrequestlist(self): 688 return make_paramrequestlist_opt([ 689 'subnet_mask', 690 'router', 691 'name_server', 692 'domain', 693 31, # Perform Router Discover 694 33, # Static Route 695 'vendor_specific', 696 44, # NetBIOS over TCP/IP Name Server 697 46, # NetBIOS over TCP/IP Node Type 698 47, # NetBIOS over TCP/IP Scope 699 121, # Classless Static Route 700 249, # Private/Classless Static Route (MS) 701 252, # Private/Proxy autodiscovery 702 ]) 703 704 @test_tracker_info(uuid="11b3db9c-4cd7-4088-99dc-881f25ce4e76") 705 def test_win10_discover(self): 706 req = self._make_discover(self.hwaddr, bcastbit=True, 707 options=[ 708 # HW type Ethernet (0x01) 709 ('client_id', b'\x01' + self.hwaddr), 710 ('hostname', b'test120-w'), 711 ('vendor_class_id', b'MSFT 5.0'), 712 self._make_win10_paramrequestlist(), 713 ], opts_padding=bytes(11)) 714 req.getlayer(BOOTP).secs = 2 715 resp = self._get_response(req) 716 self._assert_offer(resp) 717 self._assert_standard_offer_or_ack(resp, bcast=True) 718 719 @test_tracker_info(uuid="4fe04e7f-c643-4a19-b15c-cf417b2c9410") 720 def test_win10_request_selecting(self): 721 req = self._make_request(self.hwaddr, None, None, bcastbit=True, 722 options=[ 723 ('max_dhcp_size', 1500), 724 # HW type Ethernet (0x01) 725 ('client_id', b'\x01' + self.hwaddr), 726 ('requested_addr', NETADDR_PREFIX + '109'), 727 ('server_id', self.server_addr), 728 ('hostname', b'test120-w'), 729 # Client Fully Qualified Domain Name 730 (81, b'\x00\x00\x00test120-w.ad.tst.example.com'), 731 ('vendor_class_id', b'MSFT 5.0'), 732 self._make_win10_paramrequestlist(), 733 ]) 734 resp = self._get_response(req) 735 self._assert_ack(resp) 736 self._assert_standard_offer_or_ack(resp, bcast=True) 737 738 def _run_win10_request_renewing(self, bcast): 739 req_ip = NETADDR_PREFIX + '109' 740 req = self._make_request(self.hwaddr, None, None, bcastbit=bcast, 741 ciaddr=req_ip, ip_src=req_ip, 742 ip_dst=NETADDR_BROADCAST if bcast else self.server_addr, 743 options=[ 744 ('max_dhcp_size', 1500), 745 # HW type Ethernet (0x01) 746 ('client_id', b'\x01' + self.hwaddr), 747 ('hostname', b'test120-w'), 748 # Client Fully Qualified Domain Name 749 (81, b'\x00\x00\x00test120-w.ad.tst.example.com'), 750 ('vendor_class_id', b'MSFT 5.0'), 751 self._make_win10_paramrequestlist(), 752 ]) 753 resp = self._get_response(req) 754 self._assert_ack(resp) 755 self._assert_standard_offer_or_ack(resp, renewing=True, bcast=bcast) 756 757 @test_tracker_info(uuid="1b23c9c7-cc94-42d0-83a6-f1b2bc125fb9") 758 def test_win10_request_renewing(self): 759 self._run_win10_request_renewing(bcast=False) 760 761 @test_tracker_info(uuid="c846bd14-71fb-4492-a4d3-0aa5c2c35751") 762 def test_win10_request_rebinding(self): 763 self._run_win10_request_renewing(bcast=True) 764 765 def _make_debian_paramrequestlist(self): 766 return make_paramrequestlist_opt([ 767 'subnet_mask', 768 'broadcast_address', 769 'router', 770 'name_server', 771 119, # Domain Search 772 'hostname', 773 101, # TCode 774 'domain', # NetBIOS over TCP/IP Name Server 775 'vendor_specific', # NetBIOS over TCP/IP Node Type 776 121, # Classless Static Route 777 249, # Private/Classless Static Route (MS) 778 33, # Static Route 779 252, # Private/Proxy autodiscovery 780 'NTP_server', 781 ]) 782 783 @test_tracker_info(uuid="b0bb6ae7-07e6-4ecb-9a2f-db9c8146a3d5") 784 def test_debian_dhclient_4_3_5_discover(self): 785 req_ip = NETADDR_PREFIX + '109' 786 req = self._make_discover(self.hwaddr, 787 options=[ 788 ('requested_addr', req_ip), 789 ('hostname', b'test12'), 790 self._make_debian_paramrequestlist(), 791 ], opts_padding=bytes(26)) 792 resp = self._get_response(req) 793 self._assert_offer(resp) 794 # Don't test for hostname option: previous implementation would not 795 # set it in offer, which was not consistent with ack 796 self._assert_standard_offer_or_ack(resp, ignore_hostname=True) 797 asserts.assert_equal(req_ip, get_yiaddr(resp)) 798 799 @test_tracker_info(uuid="d70bc043-84cb-4735-9123-c46c6d1ce5ac") 800 def test_debian_dhclient_4_3_5_request_selecting(self): 801 req = self._make_request(self.hwaddr, None, None, 802 options=[ 803 ('server_id', self.server_addr), 804 ('requested_addr', NETADDR_PREFIX + '109'), 805 ('hostname', b'test12'), 806 self._make_debian_paramrequestlist(), 807 ], opts_padding=bytes(20)) 808 resp = self._get_response(req) 809 self._assert_ack(resp) 810 self._assert_standard_offer_or_ack(resp, with_hostname=True) 811 812 def _run_debian_renewing(self, bcast): 813 req_ip = NETADDR_PREFIX + '109' 814 req = self._make_request(self.hwaddr, None, None, 815 ciaddr=req_ip, ip_src=req_ip, 816 ip_dst=NETADDR_BROADCAST if bcast else self.server_addr, 817 options=[ 818 ('hostname', b'test12'), 819 self._make_debian_paramrequestlist(), 820 ], 821 opts_padding=bytes(32)) 822 resp = self._get_response(req) 823 self._assert_ack(resp) 824 self._assert_standard_offer_or_ack(resp, renewing=True, 825 with_hostname=True) 826 827 @test_tracker_info(uuid="5e1e817d-9972-46ca-8d44-1e120bf1bafc") 828 def test_debian_dhclient_4_3_5_request_renewing(self): 829 self._run_debian_renewing(bcast=False) 830 831 @test_tracker_info(uuid="b179a36d-910e-4006-a79a-11cc561b69db") 832 def test_debian_dhclient_4_3_5_request_rebinding(self): 833 self._run_debian_renewing(bcast=True) 834 835 def _assert_standard_offer_or_ack(self, resp, renewing=False, bcast=False, 836 ignore_hostname=False, with_hostname=False): 837 # Responses to renew/rebind are always unicast to ciaddr even with 838 # broadcast flag set (RFC does not define this behavior, but this is 839 # more efficient and matches previous behavior) 840 if bcast and not renewing: 841 self._assert_broadcast(resp) 842 self._assert_broadcastbit(resp, isset=True) 843 else: 844 # Previous implementation would set the broadcast flag but send a 845 # unicast reply if (bcast and renewing). This was not consistent and 846 # new implementation consistently clears the flag. Not testing for 847 # broadcast flag value to maintain compatibility. 848 self._assert_unicast(resp) 849 850 bootp_resp = resp.getlayer(BOOTP) 851 asserts.assert_equal(0, bootp_resp.hops) 852 if renewing: 853 asserts.assert_true(bootp_resp.ciaddr.startswith(NETADDR_PREFIX), 854 'ciaddr does not start with expected prefix') 855 else: 856 asserts.assert_equal(INET4_ANY, bootp_resp.ciaddr) 857 asserts.assert_true(bootp_resp.yiaddr.startswith(NETADDR_PREFIX), 858 'yiaddr does not start with expected prefix') 859 asserts.assert_true(bootp_resp.siaddr.startswith(NETADDR_PREFIX), 860 'siaddr does not start with expected prefix') 861 asserts.assert_equal(INET4_ANY, bootp_resp.giaddr) 862 863 opt_labels = get_opt_labels(bootp_resp) 864 # FQDN option 81 is not supported in new behavior 865 opt_labels = [opt for opt in opt_labels if opt != 81] 866 867 # Expect exactly these options in this order 868 expected_opts = [ 869 'message-type', 'server_id', 'lease_time', 'renewal_time', 870 'rebinding_time', 'subnet_mask', 'broadcast_address', 'router', 871 'name_server'] 872 if ignore_hostname: 873 opt_labels = [opt for opt in opt_labels if opt != 'hostname'] 874 elif with_hostname: 875 expected_opts.append('hostname') 876 expected_opts.extend(['vendor_specific', 'end']) 877 asserts.assert_equal(expected_opts, opt_labels) 878 879 def _request_address(self, hwaddr, bcast=True): 880 resp = self._get_response(self._make_discover(hwaddr)) 881 self._assert_offer(resp) 882 addr = get_yiaddr(resp) 883 siaddr = getopt(resp, 'server_id') 884 resp = self._get_response(self._make_request(hwaddr, addr, siaddr, 885 ip_dst=(INET4_ANY if bcast else siaddr))) 886 self._assert_ack(resp) 887 return addr, siaddr, resp 888 889 def _get_response(self, packet): 890 resp = srp1(packet, iface=self.iface, timeout=10, verbose=False) 891 bootp_resp = (resp or None) and resp.getlayer(BOOTP) 892 if bootp_resp != None and get_mess_type(bootp_resp) == ACK: 893 # Note down corresponding release for this request 894 release = self._make_release(bootp_resp.chaddr, bootp_resp.yiaddr, 895 getopt(bootp_resp, 'server_id')) 896 self.cleanup_releases.append(release) 897 return resp 898 899 def _send(self, packet): 900 sendp(packet, iface=self.iface, verbose=False) 901 902 def _assert_type(self, packet, tp): 903 asserts.assert_false(None == packet, "No packet") 904 asserts.assert_equal(tp, get_mess_type(packet)) 905 906 def _assert_ack(self, packet): 907 self._assert_type(packet, ACK) 908 909 def _assert_nak(self, packet): 910 self._assert_type(packet, NAK) 911 912 def _assert_offer(self, packet): 913 self._assert_type(packet, OFFER) 914 915 def _assert_broadcast(self, packet): 916 asserts.assert_false(None == packet, "No packet") 917 asserts.assert_equal(packet.getlayer(Ether).dst, BROADCAST_MAC) 918 asserts.assert_equal(packet.getlayer(IP).dst, NETADDR_BROADCAST) 919 self._assert_broadcastbit(packet) 920 921 def _assert_broadcastbit(self, packet, isset=True): 922 mask = 0x8000 923 flag = packet.getlayer(BOOTP).flags 924 asserts.assert_equal(flag & mask, mask if isset else 0) 925 926 def _assert_unicast(self, packet, ipAddr=None): 927 asserts.assert_false(None == packet, "No packet") 928 asserts.assert_false(packet.getlayer(Ether).dst == BROADCAST_MAC, 929 "Layer 2 packet destination address was broadcast") 930 if ipAddr: 931 asserts.assert_equal(packet.getlayer(IP).dst, ipAddr) 932 933 def _assert_relayed(self, packet, giaddr): 934 self._assert_unicast(packet, giaddr) 935 asserts.assert_equal(giaddr, packet.getlayer(BOOTP).giaddr, 936 'Relayed response has invalid giaddr field') 937 938 def _next_hwaddr(self): 939 addr = make_hwaddr(self.next_hwaddr_index) 940 self.next_hwaddr_index = self.next_hwaddr_index + 1 941 return addr 942 943 def _make_dhcp(self, src_hwaddr, options, ciaddr=INET4_ANY, 944 ip_src=INET4_ANY, ip_dst=NETADDR_BROADCAST, giaddr=INET4_ANY, 945 bcastbit=False): 946 broadcast = (ip_dst == NETADDR_BROADCAST) 947 ethernet = Ether(dst=(BROADCAST_MAC if broadcast else self.dut_hwaddr)) 948 ip = IP(src=ip_src, dst=ip_dst) 949 udp = UDP(sport=68, dport=SERVER_PORT) 950 bootp = BOOTP(chaddr=src_hwaddr, ciaddr=ciaddr, giaddr=giaddr, 951 flags=(0x8000 if bcastbit else 0), xid=random.randrange(0, 2**32)) 952 dhcp = DHCP(options=options) 953 return ethernet / ip / udp / bootp / dhcp 954 955 def _make_discover(self, src_hwaddr, options = [], giaddr=INET4_ANY, 956 bcastbit=False, opts_padding=None, ip_src=INET4_ANY): 957 opts = [('message-type','discover')] 958 opts.extend(options) 959 opts.append('end') 960 if (opts_padding): 961 opts.append(opts_padding) 962 return self._make_dhcp(src_hwaddr, options=opts, giaddr=giaddr, 963 ip_dst=NETADDR_BROADCAST, bcastbit=bcastbit, ip_src=ip_src) 964 965 def _make_request(self, src_hwaddr, reqaddr, siaddr, ciaddr=INET4_ANY, 966 ip_dst=None, ip_src=None, giaddr=INET4_ANY, bcastbit=False, 967 options=[], opts_padding=None): 968 if not ip_dst: 969 ip_dst = siaddr or INET4_ANY 970 971 if not ip_src and ip_dst == INET4_ANY: 972 ip_src = INET4_ANY 973 elif not ip_src: 974 ip_src = (giaddr if not isempty(giaddr) 975 else ciaddr if not isempty(ciaddr) 976 else reqaddr) 977 # Kernel will not receive unicast UDP packets with empty ip_src 978 asserts.assert_false(ip_dst != INET4_ANY and isempty(ip_src), 979 "Unicast ip_src cannot be zero") 980 opts = [('message-type', 'request')] 981 if options: 982 opts.extend(options) 983 else: 984 if siaddr: 985 opts.append(('server_id', siaddr)) 986 if reqaddr: 987 opts.append(('requested_addr', reqaddr)) 988 opts.append('end') 989 if opts_padding: 990 opts.append(opts_padding) 991 return self._make_dhcp(src_hwaddr, options=opts, ciaddr=ciaddr, 992 ip_src=ip_src, ip_dst=ip_dst, giaddr=giaddr, bcastbit=bcastbit) 993 994 def _make_release(self, src_hwaddr, addr, server_id): 995 opts = [('message-type', 'release'), ('server_id', server_id), 'end'] 996 return self._make_dhcp(src_hwaddr, opts, ciaddr=addr, ip_src=addr, 997 ip_dst=server_id) 998 999def assert_bootp_response(resp, req): 1000 bootp = resp.getlayer(BOOTP) 1001 asserts.assert_equal(2, bootp.op, 'Invalid BOOTP op') 1002 asserts.assert_equal(1, bootp.htype, 'Invalid BOOTP htype') 1003 asserts.assert_equal(6, bootp.hlen, 'Invalid BOOTP hlen') 1004 asserts.assert_equal(0, bootp.hops, 'Invalid BOOTP hops') 1005 asserts.assert_equal(req.getlayer(BOOTP).xid, bootp.xid, 'Invalid XID') 1006 return bootp 1007 1008 1009def make_paramrequestlist_opt(params): 1010 param_indexes = [DHCPRevOptions[opt][0] if isinstance(opt, str) else opt 1011 for opt in params] 1012 return tuple(['param_req_list'] + [ 1013 opt.to_bytes(1, byteorder='big') if isinstance(opt, int) else opt 1014 for opt in param_indexes]) 1015 1016 1017def isempty(addr): 1018 return not addr or addr == INET4_ANY 1019 1020 1021def setopt(packet, optname, val): 1022 dhcp = packet.getlayer(DHCP) 1023 if optname in get_opt_labels(dhcp): 1024 dhcp.options = [(optname, val) if opt[0] == optname else opt 1025 for opt in dhcp.options] 1026 else: 1027 # Add before the last option (last option is "end") 1028 dhcp.options.insert(len(dhcp.options) - 1, (optname, val)) 1029 1030 1031def getopt(packet, key): 1032 opts = [opt[1] for opt in packet.getlayer(DHCP).options if opt[0] == key] 1033 return opts[0] if opts else None 1034 1035 1036def removeopt(packet, key): 1037 dhcp = packet.getlayer(DHCP) 1038 dhcp.options = [opt for opt in dhcp.options if opt[0] != key] 1039 1040 1041def get_opt_labels(packet): 1042 dhcp_resp = packet.getlayer(DHCP) 1043 # end option is a single string, not a tuple. 1044 return [opt if isinstance(opt, str) else opt[0] 1045 for opt in dhcp_resp.options if opt != 'pad'] 1046 1047 1048def get_yiaddr(packet): 1049 return packet.getlayer(BOOTP).yiaddr 1050 1051 1052def get_chaddr(packet): 1053 # We use Ethernet addresses. Ignore address padding 1054 return packet.getlayer(BOOTP).chaddr[:6] 1055 1056 1057def get_mess_type(packet): 1058 return getopt(packet, 'message-type') 1059 1060 1061def make_hwaddr(index): 1062 if index > 0xffff: 1063 raise ValueError("Address index out of range") 1064 return b'\x44\x85\x00\x00' + bytes([index >> 8, index & 0xff]) 1065 1066 1067def format_hwaddr(addr): 1068 return ':'.join(['%02x' % c for c in addr]) 1069