• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1from acts import asserts
2from acts import base_test
3from acts.controllers import android_device
4from acts.test_decorators import test_tracker_info
5
6from scapy.all import *
7from threading import Event
8from threading import Thread
9import random
10import time
11import warnings
12
13
14CLIENT_PORT = 68
15SERVER_PORT = 67
16BROADCAST_MAC = 'ff:ff:ff:ff:ff:ff'
17INET4_ANY = '0.0.0.0'
18NETADDR_PREFIX = '192.168.42.'
19OTHER_NETADDR_PREFIX = '192.168.43.'
20NETADDR_BROADCAST = '255.255.255.255'
21SUBNET_BROADCAST = NETADDR_PREFIX + '255'
22
23
24OFFER = 2
25REQUEST = 3
26ACK = 5
27NAK = 6
28
29
30pmc_base_cmd = (
31    "am broadcast -a com.android.pmc.action.AUTOPOWER --es PowerAction ")
32start_pmc_cmd = (
33    "am start -S -n com.android.pmc/com.android.pmc.PMCMainActivity")
34pmc_start_usb_tethering_cmd = "%sStartUSBTethering" % pmc_base_cmd
35pmc_stop_usb_tethering_cmd = "%sStopUSBTethering" % pmc_base_cmd
36
37
38class DhcpServerTest(base_test.BaseTestClass):
39    def setup_class(self):
40        self.dut = self.android_devices[0]
41        self.USB_TETHERED = False
42        self.next_hwaddr_index = 0
43        self.stop_arp = Event()
44
45        conf.checkIPaddr = 0
46        conf.checkIPsrc = 0
47        # Allow using non-67 server ports as long as client uses 68
48        bind_layers(UDP, BOOTP, dport=CLIENT_PORT)
49
50        self.dut.adb.shell(start_pmc_cmd)
51        self.dut.adb.shell("setprop log.tag.PMC VERBOSE")
52        iflist_before = get_if_list()
53        self._start_usb_tethering(self.dut)
54        self.iface = self._wait_for_new_iface(iflist_before)
55        self.real_hwaddr = get_if_raw_hwaddr(self.iface)
56
57        # Start a thread to answer to all ARP "who-has"
58        thread = Thread(target=self._sniff_arp, args=(self.stop_arp,))
59        thread.start()
60
61        # Discover server IP and device hwaddr
62        hwaddr = self._next_hwaddr()
63        resp = self._get_response(self._make_discover(hwaddr))
64        asserts.assert_false(None == resp,
65            "Device did not reply to first DHCP discover")
66        self.server_addr = getopt(resp, 'server_id')
67        self.dut_hwaddr = resp.getlayer(Ether).src
68        asserts.assert_false(None == self.server_addr,
69            "DHCP server did not specify server identifier")
70        # Ensure that we don't depend on assigned route/gateway on the host
71        conf.route.add(host=self.server_addr, dev=self.iface, gw="0.0.0.0")
72
73    def setup_test(self):
74        # Some versions of scapy do not close the receive file properly
75        warnings.filterwarnings("ignore", category=ResourceWarning)
76
77        bind_layers(UDP, BOOTP, dport=68)
78        self.hwaddr = self._next_hwaddr()
79        self.other_hwaddr = self._next_hwaddr()
80        self.cleanup_releases = []
81
82    def teardown_test(self):
83        for packet in self.cleanup_releases:
84            self._send(packet)
85
86    def teardown_class(self):
87        self.stop_arp.set()
88        self._stop_usb_tethering(self.dut)
89
90    def on_fail(self, test_name, begin_time):
91        self.dut.take_bug_report(test_name, begin_time)
92
93    def _start_usb_tethering(self, dut):
94        """ Start USB tethering
95
96        Args:
97            1. dut - ad object
98        """
99        self.log.info("Starting USB Tethering")
100        dut.stop_services()
101        dut.adb.shell(pmc_start_usb_tethering_cmd)
102        self._wait_for_device(self.dut)
103        self.USB_TETHERED = True
104
105    def _stop_usb_tethering(self, dut):
106        """ Stop USB tethering
107
108        Args:
109            1. dut - ad object
110        """
111        self.log.info("Stopping USB Tethering")
112        dut.adb.shell(pmc_stop_usb_tethering_cmd)
113        self._wait_for_device(self.dut)
114        dut.start_services()
115        self.USB_TETHERED = False
116
117    def _wait_for_device(self, dut):
118        """ Wait for device to come back online
119
120        Args:
121            1. dut - ad object
122        """
123        while dut.serial not in android_device.list_adb_devices():
124            pass
125        dut.adb.wait_for_device()
126
127    def _wait_for_new_iface(self, old_ifaces):
128        old_set = set(old_ifaces)
129        # Try 10 times to find a new interface with a 1s sleep every time
130        # (equivalent to a 9s timeout)
131        for i in range(0, 10):
132            new_ifaces = set(get_if_list()) - old_set
133            asserts.assert_true(len(new_ifaces) < 2,
134                "Too many new interfaces after turning on tethering")
135            if len(new_ifaces) == 1:
136                return new_ifaces.pop()
137            time.sleep(1)
138        asserts.fail("Timeout waiting for tethering interface on host")
139
140    def _sniff_arp(self, stop_arp):
141        try:
142            sniff(iface=self.iface, filter='arp', prn=self._handle_arp, store=0,
143                stop_filter=lambda p: stop_arp.is_set())
144        except:
145            # sniff may raise when tethering is disconnected. Ignore
146            # exceptions if stop was requested.
147            if not stop_arp.is_set():
148                raise
149
150    def _handle_arp(self, packet):
151        # Reply to all arp "who-has": say we have everything
152        if packet[ARP].op == ARP.who_has:
153            reply = ARP(op=ARP.is_at, hwsrc=self.real_hwaddr, psrc=packet.pdst,
154                hwdst=BROADCAST_MAC, pdst=SUBNET_BROADCAST)
155            sendp(Ether(dst=BROADCAST_MAC, src=self.real_hwaddr) / reply,
156                iface=self.iface, verbose=False)
157
158    @test_tracker_info(uuid="a8712769-977a-4ee1-902f-90b3ba30b40c")
159    def test_config_assumptions(self):
160        resp = self._get_response(self._make_discover(self.hwaddr))
161        asserts.assert_false(None == resp, "Device did not reply to discover")
162        asserts.assert_true(get_yiaddr(resp).startswith(NETADDR_PREFIX),
163            "Server does not use expected prefix")
164
165    @test_tracker_info(uuid="e3761689-7d64-46b1-97ce-15f315eaf568")
166    def test_discover_broadcastbit(self):
167        resp = self._get_response(
168            self._make_discover(self.hwaddr, bcastbit=True))
169        self._assert_offer(resp)
170        self._assert_broadcast(resp)
171
172    @test_tracker_info(uuid="30a7ea7c-c20f-4c46-aaf2-96f19d8f8191")
173    def test_discover_bootpfields(self):
174        discover = self._make_discover(self.hwaddr)
175        resp = self._get_response(discover)
176        self._assert_offer(resp)
177        self._assert_unicast(resp)
178        bootp = assert_bootp_response(resp, discover)
179        asserts.assert_equal(INET4_ANY, bootp.ciaddr)
180        asserts.assert_equal(self.server_addr, bootp.siaddr)
181        asserts.assert_equal(INET4_ANY, bootp.giaddr)
182        asserts.assert_equal(self.hwaddr, get_chaddr(bootp))
183
184    @test_tracker_info(uuid="593f4051-516d-44fa-8834-7d485362f182")
185    def test_discover_relayed_broadcastbit(self):
186        giaddr = NETADDR_PREFIX + '123'
187        resp = self._get_response(
188            self._make_discover(self.hwaddr, giaddr=giaddr, bcastbit=True))
189        self._assert_offer(resp)
190        self._assert_relayed(resp, giaddr)
191        self._assert_broadcastbit(resp)
192
193    def _run_discover_paramrequestlist(self, params, unwanted_params):
194        params_opt = make_paramrequestlist_opt(params)
195        resp = self._get_response(
196            self._make_discover(self.hwaddr, options=[params_opt]))
197
198        self._assert_offer(resp)
199        # List of requested params in response order
200        resp_opts = get_opt_labels(resp)
201        resp_requested_opts = [opt for opt in resp_opts if opt in params]
202        # All above params should be supported, order should be conserved
203        asserts.assert_equal(params, resp_requested_opts)
204        asserts.assert_equal(0, len(set(resp_opts) & set(unwanted_params)))
205        return resp
206
207    @test_tracker_info(uuid="00a8a3f6-f143-47ff-a79b-482c607fb5b8")
208    def test_discover_paramrequestlist(self):
209        resp = self._run_discover_paramrequestlist(
210            ['subnet_mask', 'broadcast_address', 'router', 'name_server'],
211            unwanted_params=[])
212        for opt in ['broadcast_address', 'router', 'name_server']:
213            asserts.assert_true(getopt(resp, opt).startswith(NETADDR_PREFIX),
214                opt + ' does not start with ' + NETADDR_PREFIX)
215
216        subnet_mask = getopt(resp, 'subnet_mask')
217        asserts.assert_true(subnet_mask.startswith('255.255.'),
218            'Unexpected subnet mask for /16+: ' + subnet_mask)
219
220    @test_tracker_info(uuid="d1aad4a3-9eab-4900-aa6a-5b82a4a64f46")
221    def test_discover_paramrequestlist_rev(self):
222        # RFC2132 #9.8: "The DHCP server is not required to return the options
223        # in the requested order, but MUST try to insert the requested options
224        # in the order requested"
225        asserts.skip('legacy behavior not compliant: fixed order used')
226        self._run_discover_paramrequestlist(
227            ['name_server', 'router', 'broadcast_address', 'subnet_mask'],
228            unwanted_params=[])
229
230    @test_tracker_info(uuid="e3ae6335-8cc7-4bf1-bb58-67646b727f2b")
231    def test_discover_paramrequestlist_unwanted(self):
232        asserts.skip('legacy behavior always sends all parameters')
233        self._run_discover_paramrequestlist(['router', 'name_server'],
234            unwanted_params=['broadcast_address', 'subnet_mask'])
235
236    def _assert_renews(self, request, addr, exp_time, resp_type=ACK):
237        # Sleep to test lease time renewal
238        time.sleep(3)
239        resp = self._get_response(request)
240        self._assert_type(resp, resp_type)
241        asserts.assert_equal(addr, get_yiaddr(resp))
242        remaining_lease = getopt(resp, 'lease_time')
243        # Lease renewed: waited for 3s, lease time not decreased by more than 2
244        asserts.assert_true(remaining_lease >= exp_time - 2,
245            'Lease not renewed')
246        # Lease times should be consistent across offers/renewals
247        asserts.assert_true(remaining_lease <= exp_time + 2,
248            'Lease time inconsistent')
249        return resp
250
251    @test_tracker_info(uuid="d6b598b7-f443-4b5a-ba80-4af5d211cade")
252    def test_discover_assigned_ownaddress(self):
253        addr, siaddr, resp = self._request_address(self.hwaddr)
254
255        lease_time = getopt(resp, 'lease_time')
256        server_id = getopt(resp, 'server_id')
257        asserts.assert_true(lease_time >= 60, "Lease time is too short")
258        asserts.assert_false(addr == INET4_ANY, "Assigned address is empty")
259        # Wait to test lease expiration time change
260        time.sleep(3)
261
262        # New discover, same address
263        resp = self._assert_renews(self._make_discover(self.hwaddr),
264            addr, lease_time, resp_type=OFFER)
265        self._assert_unicast(resp, get_yiaddr(resp))
266        self._assert_broadcastbit(resp, isset=False)
267
268    @test_tracker_info(uuid="cbb07d77-912b-4269-bbbc-adba99779587")
269    def test_discover_assigned_otherhost(self):
270        addr, siaddr, _ = self._request_address(self.hwaddr)
271
272        # New discover, same address, different client
273        resp = self._get_response(self._make_discover(self.other_hwaddr,
274            [('requested_addr', addr)]))
275
276        self._assert_offer(resp)
277        asserts.assert_false(get_yiaddr(resp) == addr,
278            "Already assigned address offered")
279        self._assert_unicast(resp, get_yiaddr(resp))
280        self._assert_broadcastbit(resp, isset=False)
281
282    @test_tracker_info(uuid="3d2b3d2f-eb5f-498f-b887-3b4638cebf14")
283    def test_discover_requestaddress(self):
284        addr = NETADDR_PREFIX + '200'
285        resp = self._get_response(self._make_discover(self.hwaddr,
286            [('requested_addr', addr)]))
287        self._assert_offer(resp)
288        asserts.assert_equal(get_yiaddr(resp), addr)
289
290        # Lease not committed: can request again
291        resp = self._get_response(self._make_discover(self.other_hwaddr,
292            [('requested_addr', addr)]))
293        self._assert_offer(resp)
294        asserts.assert_equal(get_yiaddr(resp), addr)
295
296    @test_tracker_info(uuid="5ffd9d25-304e-434b-bedb-56ccf27dcebd")
297    def test_discover_requestaddress_wrongsubnet(self):
298        addr = OTHER_NETADDR_PREFIX + '200'
299        resp = self._get_response(
300            self._make_discover(self.hwaddr, [('requested_addr', addr)]))
301        self._assert_offer(resp)
302        self._assert_unicast(resp)
303        asserts.assert_false(get_yiaddr(resp) == addr,
304            'Server offered invalid address')
305
306    @test_tracker_info(uuid="f7d6a92f-9386-4b65-b6c1-d0a3f11213bf")
307    def test_discover_giaddr_outside_subnet(self):
308        giaddr = OTHER_NETADDR_PREFIX + '201'
309        resp = self._get_response(
310            self._make_discover(self.hwaddr, giaddr=giaddr))
311        asserts.assert_equal(resp, None)
312
313    @test_tracker_info(uuid="1348c79a-9203-4bb8-b33b-af80bacd17b1")
314    def test_discover_srcaddr_outside_subnet(self):
315        srcaddr = OTHER_NETADDR_PREFIX + '200'
316        resp = self._get_response(
317            self._make_discover(self.hwaddr, ip_src=srcaddr))
318        self._assert_offer(resp)
319        asserts.assert_false(srcaddr == get_yiaddr(resp),
320            'Server offered invalid address')
321
322    @test_tracker_info(uuid="a03bb783-8665-4c66-9c0c-1bb02ddca07e")
323    def test_discover_requestaddress_giaddr_outside_subnet(self):
324        addr = NETADDR_PREFIX + '200'
325        giaddr = OTHER_NETADDR_PREFIX + '201'
326        req = self._make_discover(self.hwaddr, [('requested_addr', addr)],
327                ip_src=giaddr, giaddr=giaddr)
328        resp = self._get_response(req)
329        asserts.assert_equal(resp, None)
330
331    @test_tracker_info(uuid="725956af-71e2-45d8-b8b3-402d21bfc7db")
332    def test_discover_knownaddress_giaddr_outside_subnet(self):
333        addr, siaddr, _ = self._request_address(self.hwaddr)
334
335        # New discover, same client, through relay in invalid subnet
336        giaddr = OTHER_NETADDR_PREFIX + '200'
337        resp = self._get_response(
338            self._make_discover(self.hwaddr, giaddr=giaddr))
339        asserts.assert_equal(resp, None)
340
341    @test_tracker_info(uuid="2ee9d5b1-c15d-40c4-98e9-63202d1f1557")
342    def test_discover_knownaddress_giaddr_valid_subnet(self):
343        addr, siaddr, _ = self._request_address(self.hwaddr)
344
345        # New discover, same client, through relay in valid subnet
346        giaddr = NETADDR_PREFIX + '200'
347        resp = self._get_response(
348            self._make_discover(self.hwaddr, giaddr=giaddr))
349        self._assert_offer(resp)
350        self._assert_unicast(resp, giaddr)
351        self._assert_broadcastbit(resp, isset=False)
352
353    @test_tracker_info(uuid="f43105a5-633a-417a-8a07-39bc36c493e7")
354    def test_request_unicast(self):
355        addr, siaddr, resp = self._request_address(self.hwaddr, bcast=False)
356        self._assert_unicast(resp, addr)
357
358    @test_tracker_info(uuid="09f3c1c4-1202-4f85-a965-4d86aee069f3")
359    def test_request_bootpfields(self):
360        req_addr = NETADDR_PREFIX + '200'
361        req = self._make_request(self.hwaddr, req_addr, self.server_addr)
362        resp = self._get_response(req)
363        self._assert_ack(resp)
364        bootp = assert_bootp_response(resp, req)
365        asserts.assert_equal(INET4_ANY, bootp.ciaddr)
366        asserts.assert_equal(self.server_addr, bootp.siaddr)
367        asserts.assert_equal(INET4_ANY, bootp.giaddr)
368        asserts.assert_equal(self.hwaddr, get_chaddr(bootp))
369
370    @test_tracker_info(uuid="ec00d268-80cb-4be5-9771-2292cc7d2e18")
371    def test_request_selecting_inuse(self):
372        addr, siaddr, _ = self._request_address(self.hwaddr)
373        new_req = self._make_request(self.other_hwaddr, addr, siaddr)
374        resp = self._get_response(new_req)
375        self._assert_nak(resp)
376        self._assert_broadcast(resp)
377        bootp = assert_bootp_response(resp, new_req)
378        asserts.assert_equal(INET4_ANY, bootp.ciaddr)
379        asserts.assert_equal(INET4_ANY, bootp.yiaddr)
380        asserts.assert_equal(INET4_ANY, bootp.siaddr)
381        asserts.assert_equal(INET4_ANY, bootp.giaddr)
382        asserts.assert_equal(self.other_hwaddr, get_chaddr(bootp))
383        asserts.assert_equal(
384            ['message-type', 'server_id', 56, 'end'], # 56 is "message" opt
385            get_opt_labels(bootp))
386        asserts.assert_equal(self.server_addr, getopt(bootp, 'server_id'))
387
388    @test_tracker_info(uuid="0643c179-3542-4297-9b06-8d86ff785e9c")
389    def test_request_selecting_wrongsiaddr(self):
390        addr = NETADDR_PREFIX + '200'
391        wrong_siaddr = NETADDR_PREFIX + '201'
392        asserts.assert_false(wrong_siaddr == self.server_addr,
393            'Test assumption not met: server addr is ' + wrong_siaddr)
394        resp = self._get_response(
395            self._make_request(self.hwaddr, addr, siaddr=wrong_siaddr))
396        asserts.assert_true(resp == None,
397            'Received response for request with incorrect siaddr')
398
399    @test_tracker_info(uuid="676beab2-4af8-4bf0-a4ad-c7626ae5987f")
400    def test_request_selecting_giaddr_outside_subnet(self):
401        addr = NETADDR_PREFIX + '200'
402        giaddr = OTHER_NETADDR_PREFIX + '201'
403        resp = self._get_response(
404            self._make_request(self.hwaddr, addr, siaddr=self.server_addr,
405                giaddr=giaddr))
406        asserts.assert_equal(resp, None)
407
408    @test_tracker_info(uuid="fe17df0c-2f41-416f-bb76-d75b74b63c0f")
409    def test_request_selecting_hostnameupdate(self):
410        addr = NETADDR_PREFIX + '123'
411        hostname1 = b'testhostname1'
412        hostname2 = b'testhostname2'
413        req = self._make_request(self.hwaddr, None, None,
414            options=[
415                ('requested_addr', addr),
416                ('server_id', self.server_addr),
417                ('hostname', hostname1)])
418        resp = self._get_response(req)
419        self._assert_ack(resp)
420        self._assert_unicast(resp, addr)
421        asserts.assert_equal(hostname1, getopt(req, 'hostname'))
422
423        # Re-request with different hostname
424        setopt(req, 'hostname', hostname2)
425        resp = self._get_response(req)
426        self._assert_ack(resp)
427        self._assert_unicast(resp, addr)
428        asserts.assert_equal(hostname2, getopt(req, 'hostname'))
429
430    def _run_initreboot(self, bcastbit):
431        addr, siaddr, resp = self._request_address(self.hwaddr)
432        exp = getopt(resp, 'lease_time')
433
434        # init-reboot: siaddr is None
435        return self._assert_renews(self._make_request(
436                self.hwaddr, addr, siaddr=None, bcastbit=bcastbit), addr, exp)
437
438    @test_tracker_info(uuid="263c91b9-cfe9-4f21-985d-b7046df80528")
439    def test_request_initreboot(self):
440        resp = self._run_initreboot(bcastbit=False)
441        self._assert_unicast(resp)
442        self._assert_broadcastbit(resp, isset=False)
443
444    @test_tracker_info(uuid="f05dd60f-03dd-4e2b-8e58-80f4d752ad51")
445    def test_request_initreboot_broadcastbit(self):
446        resp = self._run_initreboot(bcastbit=True)
447        self._assert_broadcast(resp)
448
449    @test_tracker_info(uuid="5563c616-2136-47f6-9151-4e28cbfe797c")
450    def test_request_initreboot_nolease(self):
451        # RFC2131 #4.3.2
452        asserts.skip("legacy behavior not compliant")
453        addr = NETADDR_PREFIX + '123'
454        resp = self._get_response(self._make_request(self.hwaddr, addr, None))
455        asserts.assert_equal(resp, None)
456
457    @test_tracker_info(uuid="da5c5537-cb38-4a2e-828f-44bc97976fe5")
458    def test_request_initreboot_incorrectlease(self):
459        otheraddr = NETADDR_PREFIX + '123'
460        addr, siaddr, _ = self._request_address(self.hwaddr)
461        asserts.assert_false(addr == otheraddr,
462            "Test assumption not met: server assigned " + otheraddr)
463
464        resp = self._get_response(
465            self._make_request(self.hwaddr, otheraddr, siaddr=None))
466        self._assert_nak(resp)
467        self._assert_broadcast(resp)
468
469    @test_tracker_info(uuid="ce42ba57-07be-427b-9cbd-5535c62b0120")
470    def test_request_initreboot_wrongnet(self):
471        resp = self._get_response(self._make_request(self.hwaddr,
472            OTHER_NETADDR_PREFIX + '1', siaddr=None))
473        self._assert_nak(resp)
474        self._assert_broadcast(resp)
475
476    def _run_rebinding(self, bcastbit, giaddr=INET4_ANY):
477        addr, siaddr, resp = self._request_address(self.hwaddr)
478        exp = getopt(resp, 'lease_time')
479
480        # Rebinding: no siaddr or reqaddr
481        resp = self._assert_renews(
482            self._make_request(self.hwaddr, reqaddr=None, siaddr=None,
483                ciaddr=addr, giaddr=giaddr, ip_src=addr,
484                ip_dst=NETADDR_BROADCAST, bcastbit=bcastbit),
485            addr, exp)
486        return resp, addr
487
488    @test_tracker_info(uuid="68bfcb25-5873-41ad-ad0a-bf22781534ca")
489    def test_request_rebinding(self):
490        resp, addr = self._run_rebinding(bcastbit=False)
491        self._assert_unicast(resp, addr)
492        self._assert_broadcastbit(resp, isset=False)
493
494    @test_tracker_info(uuid="4c591536-8062-40ec-ae12-1ebe7dcad8e2")
495    def test_request_rebinding_relayed(self):
496        giaddr = NETADDR_PREFIX + '123'
497        resp, _ = self._run_rebinding(bcastbit=False, giaddr=giaddr)
498        self._assert_relayed(resp, giaddr)
499        self._assert_broadcastbit(resp, isset=False)
500
501    @test_tracker_info(uuid="cee2668b-bd79-47d7-b358-8f9387d715b1")
502    def test_request_rebinding_inuse(self):
503        addr, siaddr, _ = self._request_address(self.hwaddr)
504
505        resp = self._get_response(self._make_request(
506                self.other_hwaddr, reqaddr=None, siaddr=None, ciaddr=addr))
507        self._assert_nak(resp)
508        self._assert_broadcast(resp)
509
510    @test_tracker_info(uuid="d95d69b5-ab9a-42f5-8dd0-b9b6a6d960cc")
511    def test_request_rebinding_wrongaddr(self):
512        otheraddr = NETADDR_PREFIX + '123'
513        addr, siaddr, _ = self._request_address(self.hwaddr)
514        asserts.assert_false(addr == otheraddr,
515            "Test assumption not met: server assigned " + otheraddr)
516
517        resp = self._get_response(self._make_request(
518            self.hwaddr, reqaddr=None, siaddr=siaddr, ciaddr=otheraddr))
519        self._assert_nak(resp)
520        self._assert_broadcast(resp)
521
522    @test_tracker_info(uuid="421a86b3-8779-4910-8050-7806536efabb")
523    def test_request_rebinding_wrongaddr_relayed(self):
524        otheraddr = NETADDR_PREFIX + '123'
525        relayaddr = NETADDR_PREFIX + '124'
526        addr, siaddr, _ = self._request_address(self.hwaddr)
527        asserts.assert_false(addr == otheraddr,
528            "Test assumption not met: server assigned " + otheraddr)
529        asserts.assert_false(addr == relayaddr,
530            "Test assumption not met: server assigned " + relayaddr)
531
532        req = self._make_request(self.hwaddr, reqaddr=None, siaddr=None,
533            ciaddr=otheraddr, giaddr=relayaddr)
534
535        resp = self._get_response(req)
536        self._assert_nak(resp)
537        self._assert_relayed(resp, relayaddr)
538        self._assert_broadcastbit(resp)
539
540    @test_tracker_info(uuid="6ff1fab4-009a-4758-9153-0d9db63423da")
541    def test_release(self):
542        addr, siaddr, _ = self._request_address(self.hwaddr)
543        # Re-requesting fails
544        resp = self._get_response(
545            self._make_request(self.other_hwaddr, addr, siaddr))
546        self._assert_nak(resp)
547        self._assert_broadcast(resp)
548
549        # Succeeds after release
550        self._send(self._make_release(self.hwaddr, addr, siaddr))
551        resp = self._get_response(
552            self._make_request(self.other_hwaddr, addr, siaddr))
553        self._assert_ack(resp)
554
555    @test_tracker_info(uuid="abb1a53e-6b6c-468f-88b9-ace9ca4d6593")
556    def test_release_noserverid(self):
557        addr, siaddr, _ = self._request_address(self.hwaddr)
558
559        # Release without server_id opt is ignored
560        release = self._make_release(self.hwaddr, addr, siaddr)
561        removeopt(release, 'server_id')
562        self._send(release)
563
564        # Not released: request fails
565        resp = self._get_response(
566            self._make_request(self.other_hwaddr, addr, siaddr))
567        self._assert_nak(resp)
568        self._assert_broadcast(resp)
569
570    @test_tracker_info(uuid="8415b69e-ae61-4474-8495-d783ba6818d1")
571    def test_release_wrongserverid(self):
572        addr, siaddr, _ = self._request_address(self.hwaddr)
573
574        # Release with wrong server id
575        release = self._make_release(self.hwaddr, addr, siaddr)
576        setopt(release, 'server_id', addr)
577        self._send(release)
578
579        # Not released: request fails
580        resp = self._get_response(
581            self._make_request(self.other_hwaddr, addr, siaddr))
582        self._assert_nak(resp)
583        self._assert_broadcast(resp)
584
585    @test_tracker_info(uuid="0858f678-3db2-4c12-a21b-6e16c5d7e7ce")
586    def test_unicast_l2l3(self):
587        reqAddr = NETADDR_PREFIX + '124'
588        resp = self._get_response(self._make_request(
589            self.hwaddr, reqAddr, siaddr=None))
590        self._assert_unicast(resp)
591        str_hwaddr = format_hwaddr(self.hwaddr)
592        asserts.assert_equal(str_hwaddr, resp.getlayer(Ether).dst)
593        asserts.assert_equal(reqAddr, resp.getlayer(IP).dst)
594        asserts.assert_equal(CLIENT_PORT, resp.getlayer(UDP).dport)
595
596    @test_tracker_info(uuid="bf05efe9-ee5b-46ba-9b3c-5a4441c13798")
597    def test_macos_10_13_3_discover(self):
598        params_opt = make_paramrequestlist_opt([
599            'subnet_mask',
600            121, # Classless Static Route
601            'router',
602            'name_server',
603            'domain',
604            119, # Domain Search
605            252, # Private/Proxy autodiscovery
606            95, # LDAP
607            'NetBIOS_server',
608            46, # NetBIOS over TCP/IP Node Type
609            ])
610        req = self._make_discover(self.hwaddr,
611            options=[
612                params_opt,
613                ('max_dhcp_size', 1500),
614                # HW type Ethernet (0x01)
615                ('client_id', b'\x01' + self.hwaddr),
616                ('lease_time', 7776000),
617                ('hostname', b'test12-macbookpro'),
618            ], opts_padding=bytes(6))
619        req.getlayer(BOOTP).secs = 2
620        resp = self._get_response(req)
621        self._assert_standard_offer(resp)
622
623    def _make_macos_10_13_3_paramrequestlist(self):
624        return make_paramrequestlist_opt([
625            'subnet_mask',
626            121, # Classless Static Route
627            'router',
628            'name_server',
629            'domain',
630            119, # Domain Search
631            252, # Private/Proxy autodiscovery
632            95, # LDAP
633            44, # NetBIOS over TCP/IP Name Server
634            46, # NetBIOS over TCP/IP Node Type
635            ])
636
637    @test_tracker_info(uuid="bf05efe9-ee5b-46ba-9b3c-5a4441c13798")
638    def test_macos_10_13_3_discover(self):
639        req = self._make_discover(self.hwaddr,
640            options=[
641                self._make_macos_10_13_3_paramrequestlist(),
642                ('max_dhcp_size', 1500),
643                # HW type Ethernet (0x01)
644                ('client_id', b'\x01' + self.hwaddr),
645                ('lease_time', 7776000),
646                ('hostname', b'test12-macbookpro'),
647            ], opts_padding=bytes(6))
648        req.getlayer(BOOTP).secs = 2
649        resp = self._get_response(req)
650        self._assert_offer(resp)
651        self._assert_standard_offer_or_ack(resp)
652
653    @test_tracker_info(uuid="7acc796b-c4f1-46cc-8ffb-0a0efb05ae86")
654    def test_macos_10_13_3_request_selecting(self):
655        req = self._make_request(self.hwaddr, None, None,
656            options=[
657                self._make_macos_10_13_3_paramrequestlist(),
658                ('max_dhcp_size', 1500),
659                # HW type Ethernet (0x01)
660                ('client_id', b'\x01' + self.hwaddr),
661                ('requested_addr', NETADDR_PREFIX + '109'),
662                ('server_id', self.server_addr),
663                ('hostname', b'test12-macbookpro'),
664            ])
665        req.getlayer(BOOTP).secs = 5
666        resp = self._get_response(req)
667        self._assert_ack(resp)
668        self._assert_standard_offer_or_ack(resp)
669
670    # Note: macOS does not seem to do any rebinding (straight to discover)
671    @test_tracker_info(uuid="e8f0b60c-9ea3-4184-8426-151a395bff5b")
672    def test_macos_10_13_3_request_renewing(self):
673        req_ip = NETADDR_PREFIX + '109'
674        req = self._make_request(self.hwaddr, None, None,
675            ciaddr=req_ip, ip_src=req_ip, ip_dst=self.server_addr, options=[
676                self._make_macos_10_13_3_paramrequestlist(),
677                ('max_dhcp_size', 1500),
678                # HW type Ethernet (0x01)
679                ('client_id', b'\x01' + self.hwaddr),
680                ('lease_time', 7776000),
681                ('hostname', b'test12-macbookpro'),
682            ], opts_padding=bytes(6))
683        resp = self._get_response(req)
684        self._assert_ack(resp)
685        self._assert_standard_offer_or_ack(resp, renewing=True)
686
687    def _make_win10_paramrequestlist(self):
688        return make_paramrequestlist_opt([
689            'subnet_mask',
690            'router',
691            'name_server',
692            'domain',
693            31, # Perform Router Discover
694            33, # Static Route
695            'vendor_specific',
696            44, # NetBIOS over TCP/IP Name Server
697            46, # NetBIOS over TCP/IP Node Type
698            47, # NetBIOS over TCP/IP Scope
699            121, # Classless Static Route
700            249, # Private/Classless Static Route (MS)
701            252, # Private/Proxy autodiscovery
702            ])
703
704    @test_tracker_info(uuid="11b3db9c-4cd7-4088-99dc-881f25ce4e76")
705    def test_win10_discover(self):
706        req = self._make_discover(self.hwaddr, bcastbit=True,
707            options=[
708                # HW type Ethernet (0x01)
709                ('client_id', b'\x01' + self.hwaddr),
710                ('hostname', b'test120-w'),
711                ('vendor_class_id', b'MSFT 5.0'),
712                self._make_win10_paramrequestlist(),
713            ], opts_padding=bytes(11))
714        req.getlayer(BOOTP).secs = 2
715        resp = self._get_response(req)
716        self._assert_offer(resp)
717        self._assert_standard_offer_or_ack(resp, bcast=True)
718
719    @test_tracker_info(uuid="4fe04e7f-c643-4a19-b15c-cf417b2c9410")
720    def test_win10_request_selecting(self):
721        req = self._make_request(self.hwaddr, None, None, bcastbit=True,
722            options=[
723                ('max_dhcp_size', 1500),
724                # HW type Ethernet (0x01)
725                ('client_id', b'\x01' + self.hwaddr),
726                ('requested_addr', NETADDR_PREFIX + '109'),
727                ('server_id', self.server_addr),
728                ('hostname', b'test120-w'),
729                # Client Fully Qualified Domain Name
730                (81, b'\x00\x00\x00test120-w.ad.tst.example.com'),
731                ('vendor_class_id', b'MSFT 5.0'),
732                self._make_win10_paramrequestlist(),
733            ])
734        resp = self._get_response(req)
735        self._assert_ack(resp)
736        self._assert_standard_offer_or_ack(resp, bcast=True)
737
738    def _run_win10_request_renewing(self, bcast):
739        req_ip = NETADDR_PREFIX + '109'
740        req = self._make_request(self.hwaddr, None, None, bcastbit=bcast,
741            ciaddr=req_ip, ip_src=req_ip,
742            ip_dst=NETADDR_BROADCAST if bcast else self.server_addr,
743            options=[
744                ('max_dhcp_size', 1500),
745                # HW type Ethernet (0x01)
746                ('client_id', b'\x01' + self.hwaddr),
747                ('hostname', b'test120-w'),
748                # Client Fully Qualified Domain Name
749                (81, b'\x00\x00\x00test120-w.ad.tst.example.com'),
750                ('vendor_class_id', b'MSFT 5.0'),
751                self._make_win10_paramrequestlist(),
752            ])
753        resp = self._get_response(req)
754        self._assert_ack(resp)
755        self._assert_standard_offer_or_ack(resp, renewing=True, bcast=bcast)
756
757    @test_tracker_info(uuid="1b23c9c7-cc94-42d0-83a6-f1b2bc125fb9")
758    def test_win10_request_renewing(self):
759        self._run_win10_request_renewing(bcast=False)
760
761    @test_tracker_info(uuid="c846bd14-71fb-4492-a4d3-0aa5c2c35751")
762    def test_win10_request_rebinding(self):
763        self._run_win10_request_renewing(bcast=True)
764
765    def _make_debian_paramrequestlist(self):
766        return make_paramrequestlist_opt([
767            'subnet_mask',
768            'broadcast_address',
769            'router',
770            'name_server',
771            119, # Domain Search
772            'hostname',
773            101, # TCode
774            'domain', # NetBIOS over TCP/IP Name Server
775            'vendor_specific', # NetBIOS over TCP/IP Node Type
776            121, # Classless Static Route
777            249, # Private/Classless Static Route (MS)
778            33, # Static Route
779            252, # Private/Proxy autodiscovery
780            'NTP_server',
781            ])
782
783    @test_tracker_info(uuid="b0bb6ae7-07e6-4ecb-9a2f-db9c8146a3d5")
784    def test_debian_dhclient_4_3_5_discover(self):
785        req_ip = NETADDR_PREFIX + '109'
786        req = self._make_discover(self.hwaddr,
787            options=[
788                ('requested_addr', req_ip),
789                ('hostname', b'test12'),
790                self._make_debian_paramrequestlist(),
791            ], opts_padding=bytes(26))
792        resp = self._get_response(req)
793        self._assert_offer(resp)
794        # Don't test for hostname option: previous implementation would not
795        # set it in offer, which was not consistent with ack
796        self._assert_standard_offer_or_ack(resp, ignore_hostname=True)
797        asserts.assert_equal(req_ip, get_yiaddr(resp))
798
799    @test_tracker_info(uuid="d70bc043-84cb-4735-9123-c46c6d1ce5ac")
800    def test_debian_dhclient_4_3_5_request_selecting(self):
801        req = self._make_request(self.hwaddr, None, None,
802            options=[
803                ('server_id', self.server_addr),
804                ('requested_addr', NETADDR_PREFIX + '109'),
805                ('hostname', b'test12'),
806                self._make_debian_paramrequestlist(),
807            ], opts_padding=bytes(20))
808        resp = self._get_response(req)
809        self._assert_ack(resp)
810        self._assert_standard_offer_or_ack(resp, with_hostname=True)
811
812    def _run_debian_renewing(self, bcast):
813        req_ip = NETADDR_PREFIX + '109'
814        req = self._make_request(self.hwaddr, None, None,
815            ciaddr=req_ip, ip_src=req_ip,
816            ip_dst=NETADDR_BROADCAST if bcast else self.server_addr,
817            options=[
818                ('hostname', b'test12'),
819                self._make_debian_paramrequestlist(),
820            ],
821            opts_padding=bytes(32))
822        resp = self._get_response(req)
823        self._assert_ack(resp)
824        self._assert_standard_offer_or_ack(resp, renewing=True,
825            with_hostname=True)
826
827    @test_tracker_info(uuid="5e1e817d-9972-46ca-8d44-1e120bf1bafc")
828    def test_debian_dhclient_4_3_5_request_renewing(self):
829        self._run_debian_renewing(bcast=False)
830
831    @test_tracker_info(uuid="b179a36d-910e-4006-a79a-11cc561b69db")
832    def test_debian_dhclient_4_3_5_request_rebinding(self):
833        self._run_debian_renewing(bcast=True)
834
835    def _assert_standard_offer_or_ack(self, resp, renewing=False, bcast=False,
836            ignore_hostname=False, with_hostname=False):
837        # Responses to renew/rebind are always unicast to ciaddr even with
838        # broadcast flag set (RFC does not define this behavior, but this is
839        # more efficient and matches previous behavior)
840        if bcast and not renewing:
841            self._assert_broadcast(resp)
842            self._assert_broadcastbit(resp, isset=True)
843        else:
844            # Previous implementation would set the broadcast flag but send a
845            # unicast reply if (bcast and renewing). This was not consistent and
846            # new implementation consistently clears the flag. Not testing for
847            # broadcast flag value to maintain compatibility.
848            self._assert_unicast(resp)
849
850        bootp_resp = resp.getlayer(BOOTP)
851        asserts.assert_equal(0, bootp_resp.hops)
852        if renewing:
853            asserts.assert_true(bootp_resp.ciaddr.startswith(NETADDR_PREFIX),
854                'ciaddr does not start with expected prefix')
855        else:
856            asserts.assert_equal(INET4_ANY, bootp_resp.ciaddr)
857        asserts.assert_true(bootp_resp.yiaddr.startswith(NETADDR_PREFIX),
858            'yiaddr does not start with expected prefix')
859        asserts.assert_true(bootp_resp.siaddr.startswith(NETADDR_PREFIX),
860            'siaddr does not start with expected prefix')
861        asserts.assert_equal(INET4_ANY, bootp_resp.giaddr)
862
863        opt_labels = get_opt_labels(bootp_resp)
864        # FQDN option 81 is not supported in new behavior
865        opt_labels = [opt for opt in opt_labels if opt != 81]
866
867        # Expect exactly these options in this order
868        expected_opts = [
869            'message-type', 'server_id', 'lease_time', 'renewal_time',
870            'rebinding_time', 'subnet_mask', 'broadcast_address', 'router',
871            'name_server']
872        if ignore_hostname:
873            opt_labels = [opt for opt in opt_labels if opt != 'hostname']
874        elif with_hostname:
875            expected_opts.append('hostname')
876        expected_opts.extend(['vendor_specific', 'end'])
877        asserts.assert_equal(expected_opts, opt_labels)
878
879    def _request_address(self, hwaddr, bcast=True):
880        resp = self._get_response(self._make_discover(hwaddr))
881        self._assert_offer(resp)
882        addr = get_yiaddr(resp)
883        siaddr = getopt(resp, 'server_id')
884        resp = self._get_response(self._make_request(hwaddr, addr, siaddr,
885                ip_dst=(INET4_ANY if bcast else siaddr)))
886        self._assert_ack(resp)
887        return addr, siaddr, resp
888
889    def _get_response(self, packet):
890        resp = srp1(packet, iface=self.iface, timeout=10, verbose=False)
891        bootp_resp = (resp or None) and resp.getlayer(BOOTP)
892        if bootp_resp != None and get_mess_type(bootp_resp) == ACK:
893            # Note down corresponding release for this request
894            release = self._make_release(bootp_resp.chaddr, bootp_resp.yiaddr,
895                getopt(bootp_resp, 'server_id'))
896            self.cleanup_releases.append(release)
897        return resp
898
899    def _send(self, packet):
900        sendp(packet, iface=self.iface, verbose=False)
901
902    def _assert_type(self, packet, tp):
903        asserts.assert_false(None == packet, "No packet")
904        asserts.assert_equal(tp, get_mess_type(packet))
905
906    def _assert_ack(self, packet):
907        self._assert_type(packet, ACK)
908
909    def _assert_nak(self, packet):
910        self._assert_type(packet, NAK)
911
912    def _assert_offer(self, packet):
913        self._assert_type(packet, OFFER)
914
915    def _assert_broadcast(self, packet):
916        asserts.assert_false(None == packet, "No packet")
917        asserts.assert_equal(packet.getlayer(Ether).dst, BROADCAST_MAC)
918        asserts.assert_equal(packet.getlayer(IP).dst, NETADDR_BROADCAST)
919        self._assert_broadcastbit(packet)
920
921    def _assert_broadcastbit(self, packet, isset=True):
922        mask = 0x8000
923        flag = packet.getlayer(BOOTP).flags
924        asserts.assert_equal(flag & mask, mask if isset else 0)
925
926    def _assert_unicast(self, packet, ipAddr=None):
927        asserts.assert_false(None == packet, "No packet")
928        asserts.assert_false(packet.getlayer(Ether).dst == BROADCAST_MAC,
929            "Layer 2 packet destination address was broadcast")
930        if ipAddr:
931            asserts.assert_equal(packet.getlayer(IP).dst, ipAddr)
932
933    def _assert_relayed(self, packet, giaddr):
934        self._assert_unicast(packet, giaddr)
935        asserts.assert_equal(giaddr, packet.getlayer(BOOTP).giaddr,
936            'Relayed response has invalid giaddr field')
937
938    def _next_hwaddr(self):
939        addr = make_hwaddr(self.next_hwaddr_index)
940        self.next_hwaddr_index = self.next_hwaddr_index + 1
941        return addr
942
943    def _make_dhcp(self, src_hwaddr, options, ciaddr=INET4_ANY,
944            ip_src=INET4_ANY, ip_dst=NETADDR_BROADCAST, giaddr=INET4_ANY,
945            bcastbit=False):
946        broadcast = (ip_dst == NETADDR_BROADCAST)
947        ethernet = Ether(dst=(BROADCAST_MAC if broadcast else self.dut_hwaddr))
948        ip = IP(src=ip_src, dst=ip_dst)
949        udp = UDP(sport=68, dport=SERVER_PORT)
950        bootp = BOOTP(chaddr=src_hwaddr, ciaddr=ciaddr, giaddr=giaddr,
951            flags=(0x8000 if bcastbit else 0), xid=random.randrange(0, 2**32))
952        dhcp = DHCP(options=options)
953        return ethernet / ip / udp / bootp / dhcp
954
955    def _make_discover(self, src_hwaddr, options = [], giaddr=INET4_ANY,
956            bcastbit=False, opts_padding=None, ip_src=INET4_ANY):
957        opts = [('message-type','discover')]
958        opts.extend(options)
959        opts.append('end')
960        if (opts_padding):
961            opts.append(opts_padding)
962        return self._make_dhcp(src_hwaddr, options=opts, giaddr=giaddr,
963            ip_dst=NETADDR_BROADCAST, bcastbit=bcastbit, ip_src=ip_src)
964
965    def _make_request(self, src_hwaddr, reqaddr, siaddr, ciaddr=INET4_ANY,
966            ip_dst=None, ip_src=None, giaddr=INET4_ANY, bcastbit=False,
967            options=[], opts_padding=None):
968        if not ip_dst:
969            ip_dst = siaddr or INET4_ANY
970
971        if not ip_src and ip_dst == INET4_ANY:
972            ip_src = INET4_ANY
973        elif not ip_src:
974            ip_src = (giaddr if not isempty(giaddr)
975                else ciaddr if not isempty(ciaddr)
976                else reqaddr)
977        # Kernel will not receive unicast UDP packets with empty ip_src
978        asserts.assert_false(ip_dst != INET4_ANY and isempty(ip_src),
979            "Unicast ip_src cannot be zero")
980        opts = [('message-type', 'request')]
981        if options:
982            opts.extend(options)
983        else:
984            if siaddr:
985                opts.append(('server_id', siaddr))
986            if reqaddr:
987                opts.append(('requested_addr', reqaddr))
988        opts.append('end')
989        if opts_padding:
990            opts.append(opts_padding)
991        return self._make_dhcp(src_hwaddr, options=opts, ciaddr=ciaddr,
992            ip_src=ip_src, ip_dst=ip_dst, giaddr=giaddr, bcastbit=bcastbit)
993
994    def _make_release(self, src_hwaddr, addr, server_id):
995        opts = [('message-type', 'release'), ('server_id', server_id), 'end']
996        return self._make_dhcp(src_hwaddr, opts, ciaddr=addr, ip_src=addr,
997            ip_dst=server_id)
998
999def assert_bootp_response(resp, req):
1000    bootp = resp.getlayer(BOOTP)
1001    asserts.assert_equal(2, bootp.op, 'Invalid BOOTP op')
1002    asserts.assert_equal(1, bootp.htype, 'Invalid BOOTP htype')
1003    asserts.assert_equal(6, bootp.hlen, 'Invalid BOOTP hlen')
1004    asserts.assert_equal(0, bootp.hops, 'Invalid BOOTP hops')
1005    asserts.assert_equal(req.getlayer(BOOTP).xid, bootp.xid, 'Invalid XID')
1006    return bootp
1007
1008
1009def make_paramrequestlist_opt(params):
1010    param_indexes = [DHCPRevOptions[opt][0] if isinstance(opt, str) else opt
1011        for opt in params]
1012    return tuple(['param_req_list'] + [
1013        opt.to_bytes(1, byteorder='big') if isinstance(opt, int) else opt
1014        for opt in param_indexes])
1015
1016
1017def isempty(addr):
1018    return not addr or addr == INET4_ANY
1019
1020
1021def setopt(packet, optname, val):
1022    dhcp = packet.getlayer(DHCP)
1023    if optname in get_opt_labels(dhcp):
1024        dhcp.options = [(optname, val) if opt[0] == optname else opt
1025            for opt in dhcp.options]
1026    else:
1027        # Add before the last option (last option is "end")
1028        dhcp.options.insert(len(dhcp.options) - 1, (optname, val))
1029
1030
1031def getopt(packet, key):
1032    opts = [opt[1] for opt in packet.getlayer(DHCP).options if opt[0] == key]
1033    return opts[0] if opts else None
1034
1035
1036def removeopt(packet, key):
1037    dhcp = packet.getlayer(DHCP)
1038    dhcp.options = [opt for opt in dhcp.options if opt[0] != key]
1039
1040
1041def get_opt_labels(packet):
1042    dhcp_resp = packet.getlayer(DHCP)
1043    # end option is a single string, not a tuple.
1044    return [opt if isinstance(opt, str) else opt[0]
1045        for opt in dhcp_resp.options if opt != 'pad']
1046
1047
1048def get_yiaddr(packet):
1049    return packet.getlayer(BOOTP).yiaddr
1050
1051
1052def get_chaddr(packet):
1053    # We use Ethernet addresses. Ignore address padding
1054    return packet.getlayer(BOOTP).chaddr[:6]
1055
1056
1057def get_mess_type(packet):
1058    return getopt(packet, 'message-type')
1059
1060
1061def make_hwaddr(index):
1062    if index > 0xffff:
1063        raise ValueError("Address index out of range")
1064    return b'\x44\x85\x00\x00' + bytes([index >> 8, index & 0xff])
1065
1066
1067def format_hwaddr(addr):
1068    return  ':'.join(['%02x' % c for c in addr])
1069