• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python3.4
2#
3#   Copyright 2019 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from acts import asserts
18from acts.test_decorators import test_tracker_info
19from acts.test_utils.net import connectivity_const as cconsts
20from acts.test_utils.wifi.aware import aware_const as aconsts
21from acts.controllers.ap_lib.hostapd_constants import BAND_2G
22from acts.controllers.ap_lib.hostapd_constants import BAND_5G
23from acts.test_utils.wifi.aware import aware_test_utils as autils
24from acts.test_utils.wifi import wifi_test_utils as wutils
25from acts.test_utils.wifi.aware.AwareBaseTest import AwareBaseTest
26from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest
27from scapy.all import *
28
29
30class MacRandomNoLeakageTest(AwareBaseTest, WifiBaseTest):
31    """Set of tests for Wi-Fi Aware MAC address randomization of NMI (NAN
32    management interface) and NDI (NAN data interface)."""
33
34    SERVICE_NAME = "GoogleTestServiceXYZ"
35    ping_msg = 'PING'
36
37    AWARE_DEFAULT_CHANNEL_5_BAND = 149
38    AWARE_DEFAULT_CHANNEL_24_BAND = 6
39
40    ENCR_TYPE_OPEN = 0
41    ENCR_TYPE_PASSPHRASE = 1
42    ENCR_TYPE_PMK = 2
43
44    PASSPHRASE = "This is some random passphrase - very very secure!!"
45    PMK = "ODU0YjE3YzdmNDJiNWI4NTQ2NDJjNDI3M2VkZTQyZGU="
46
47    def __init__(self, controllers):
48        WifiBaseTest.__init__(self, controllers)
49        AwareBaseTest.__init__(self, controllers)
50
51    def setup_class(self):
52        asserts.assert_true(hasattr(self, 'packet_capture'),
53                            "Needs packet_capture attribute to support sniffing.")
54        self.configure_packet_capture(channel_5g=self.AWARE_DEFAULT_CHANNEL_5_BAND,
55                                      channel_2g=self.AWARE_DEFAULT_CHANNEL_24_BAND)
56
57    def setup_test(self):
58        WifiBaseTest.setup_test(self)
59        AwareBaseTest.setup_test(self)
60
61    def teardown_test(self):
62        WifiBaseTest.teardown_test(self)
63        AwareBaseTest.teardown_test(self)
64
65    def verify_mac_no_leakage(self, pcap_procs, factory_mac_addresses, mac_addresses):
66        # Get 2G and 5G pcaps
67        pcap_fname = '%s_%s.pcap' % (pcap_procs[BAND_5G][1], BAND_5G.upper())
68        pcap_5g = rdpcap(pcap_fname)
69
70        pcap_fname = '%s_%s.pcap' % (pcap_procs[BAND_2G][1], BAND_2G.upper())
71        pcap_2g = rdpcap(pcap_fname)
72        pcaps = pcap_5g + pcap_2g
73
74        # Verify factory MAC is not leaked in both 2G and 5G pcaps
75        for mac in factory_mac_addresses:
76            wutils.verify_mac_not_found_in_pcap(mac, pcaps)
77
78        # Verify random MACs are being used and in pcaps
79        for mac in mac_addresses:
80            wutils.verify_mac_is_found_in_pcap(mac, pcaps)
81
82    def transfer_mac_format(self, mac):
83        """add ':' to mac String, and transfer to lower case
84
85    Args:
86        mac: String of mac without ':'
87        @return: Lower case String of mac like "xx:xx:xx:xx:xx:xx"
88    """
89        return re.sub(r"(?<=\w)(?=(?:\w\w)+$)", ":", mac.lower())
90
91    def start_aware(self, dut, is_publish):
92        """Start Aware attach, then start Publish/Subscribe based on role
93
94     Args:
95         dut: Aware device
96         is_publish: True for Publisher, False for subscriber
97         @:return: dict with Aware discovery session info
98    """
99        aware_id = dut.droid.wifiAwareAttach(True)
100        autils.wait_for_event(dut, aconsts.EVENT_CB_ON_ATTACHED)
101        event = autils.wait_for_event(dut, aconsts.EVENT_CB_ON_IDENTITY_CHANGED)
102        mac = self.transfer_mac_format(event["data"]["mac"])
103        dut.log.info("NMI=%s", mac)
104
105        if is_publish:
106            config = autils.create_discovery_config(self.SERVICE_NAME,
107                                                    aconsts.PUBLISH_TYPE_UNSOLICITED)
108            disc_id = dut.droid.wifiAwarePublish(aware_id, config)
109            autils.wait_for_event(dut, aconsts.SESSION_CB_ON_PUBLISH_STARTED)
110        else:
111            config = autils.create_discovery_config(self.SERVICE_NAME,
112                                                    aconsts.SUBSCRIBE_TYPE_PASSIVE)
113            disc_id = dut.droid.wifiAwareSubscribe(aware_id, config)
114            autils.wait_for_event(dut, aconsts.SESSION_CB_ON_SUBSCRIBE_STARTED)
115        aware_session = {"awareId": aware_id, "discId": disc_id, "mac": mac}
116        return aware_session
117
118    def create_date_path(self, p_dut, pub_session, s_dut, sub_session, sec_type):
119        """Create NDP based on the security type(open, PMK, PASSPHRASE), run socket connect
120
121    Args:
122        p_dut: Publish device
123        p_disc_id: Publisher discovery id
124        peer_id_on_pub: peer id on publisher
125        s_dut: Subscribe device
126        s_disc_id: Subscriber discovery id
127        peer_id_on_sub: peer id on subscriber
128        sec_type: NDP security type(open, PMK or PASSPHRASE)
129        @:return: dict with NDP info
130    """
131
132        passphrase = None
133        pmk = None
134
135        if sec_type == self.ENCR_TYPE_PASSPHRASE:
136            passphrase = self.PASSPHRASE
137        if sec_type == self.ENCR_TYPE_PMK:
138            pmk = self.PMK
139
140        p_req_key = autils.request_network(
141            p_dut,
142            p_dut.droid.wifiAwareCreateNetworkSpecifier(pub_session["discId"],
143                                                        None,
144                                                        passphrase, pmk))
145        s_req_key = autils.request_network(
146            s_dut,
147            s_dut.droid.wifiAwareCreateNetworkSpecifier(sub_session["discId"],
148                                                        sub_session["peerId"],
149                                                        passphrase, pmk))
150
151        p_net_event_nc = autils.wait_for_event_with_keys(
152            p_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT,
153            (cconsts.NETWORK_CB_KEY_EVENT,
154             cconsts.NETWORK_CB_CAPABILITIES_CHANGED),
155            (cconsts.NETWORK_CB_KEY_ID, p_req_key))
156        s_net_event_nc = autils.wait_for_event_with_keys(
157            s_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT,
158            (cconsts.NETWORK_CB_KEY_EVENT,
159             cconsts.NETWORK_CB_CAPABILITIES_CHANGED),
160            (cconsts.NETWORK_CB_KEY_ID, s_req_key))
161        p_net_event_lp = autils.wait_for_event_with_keys(
162            p_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT,
163            (cconsts.NETWORK_CB_KEY_EVENT,
164             cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED),
165            (cconsts.NETWORK_CB_KEY_ID, p_req_key))
166        s_net_event_lp = autils.wait_for_event_with_keys(
167            s_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT,
168            (cconsts.NETWORK_CB_KEY_EVENT,
169             cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED),
170            (cconsts.NETWORK_CB_KEY_ID, s_req_key))
171
172        p_aware_if = p_net_event_lp["data"][cconsts.NETWORK_CB_KEY_INTERFACE_NAME]
173        s_aware_if = s_net_event_lp["data"][cconsts.NETWORK_CB_KEY_INTERFACE_NAME]
174        p_if_mac = self.transfer_mac_format(autils.get_mac_addr(p_dut, p_aware_if))
175        p_dut.log.info("NDI %s=%s", p_aware_if, p_if_mac)
176        s_if_mac = self.transfer_mac_format(autils.get_mac_addr(s_dut, s_aware_if))
177        s_dut.log.info("NDI %s=%s", s_aware_if, s_if_mac)
178
179        s_ipv6 = p_net_event_nc["data"][aconsts.NET_CAP_IPV6]
180        p_ipv6 = s_net_event_nc["data"][aconsts.NET_CAP_IPV6]
181        asserts.assert_true(
182            autils.verify_socket_connect(p_dut, s_dut, p_ipv6, s_ipv6, 0),
183            "Failed socket link with Pub as Server")
184        asserts.assert_true(
185            autils.verify_socket_connect(s_dut, p_dut, s_ipv6, p_ipv6, 0),
186            "Failed socket link with Sub as Server")
187
188        ndp_info = {"pubReqKey": p_req_key, "pubIfMac": p_if_mac,
189                    "subReqKey": s_req_key, "subIfMac": s_if_mac}
190        return ndp_info
191
192    @test_tracker_info(uuid="c9c66873-a8e0-4830-8baa-ada03223bcef")
193    def test_ib_multi_data_path_mac_random_test(self):
194        """Verify there is no factory MAC Address leakage during the Aware discovery, NDP creation,
195        socket setup and IP service connection."""
196
197        p_dut = self.android_devices[0]
198        s_dut = self.android_devices[1]
199        mac_addresses = []
200        factory_mac_addresses = []
201        sec_types = [self.ENCR_TYPE_PMK, self.ENCR_TYPE_PASSPHRASE]
202
203        self.log.info("Starting packet capture")
204        pcap_procs = wutils.start_pcap(
205            self.packet_capture, 'dual', self.test_name)
206
207        factory_mac_1 = p_dut.droid.wifigetFactorymacAddresses()[0]
208        p_dut.log.info("Factory Address: %s", factory_mac_1)
209        factory_mac_2 = s_dut.droid.wifigetFactorymacAddresses()[0]
210        s_dut.log.info("Factory Address: %s", factory_mac_2)
211        factory_mac_addresses.append(factory_mac_1)
212        factory_mac_addresses.append(factory_mac_2)
213
214        # Start Aware and exchange messages
215        publish_session = self.start_aware(p_dut, True)
216        subscribe_session = self.start_aware(s_dut, False)
217        mac_addresses.append(publish_session["mac"])
218        mac_addresses.append(subscribe_session["mac"])
219        discovery_event = autils.wait_for_event(s_dut, aconsts.SESSION_CB_ON_SERVICE_DISCOVERED)
220        subscribe_session["peerId"] = discovery_event['data'][aconsts.SESSION_CB_KEY_PEER_ID]
221
222        msg_id = self.get_next_msg_id()
223        s_dut.droid.wifiAwareSendMessage(subscribe_session["discId"], subscribe_session["peerId"],
224                                         msg_id, self.ping_msg, aconsts.MAX_TX_RETRIES)
225        autils.wait_for_event(s_dut, aconsts.SESSION_CB_ON_MESSAGE_SENT)
226        pub_rx_msg_event = autils.wait_for_event(p_dut, aconsts.SESSION_CB_ON_MESSAGE_RECEIVED)
227        publish_session["peerId"] = pub_rx_msg_event['data'][aconsts.SESSION_CB_KEY_PEER_ID]
228
229        msg_id = self.get_next_msg_id()
230        p_dut.droid.wifiAwareSendMessage(publish_session["discId"], publish_session["peerId"],
231                                         msg_id, self.ping_msg, aconsts.MAX_TX_RETRIES)
232        autils.wait_for_event(p_dut, aconsts.SESSION_CB_ON_MESSAGE_SENT)
233        autils.wait_for_event(s_dut, aconsts.SESSION_CB_ON_MESSAGE_RECEIVED)
234
235        # Create Aware NDP
236        p_req_keys = []
237        s_req_keys = []
238        for sec in sec_types:
239            ndp_info = self.create_date_path(p_dut, publish_session, s_dut, subscribe_session, sec)
240            p_req_keys.append(ndp_info["pubReqKey"])
241            s_req_keys.append(ndp_info["subReqKey"])
242            mac_addresses.append(ndp_info["pubIfMac"])
243            mac_addresses.append(ndp_info["subIfMac"])
244
245        # clean-up
246        for p_req_key in p_req_keys:
247            p_dut.droid.connectivityUnregisterNetworkCallback(p_req_key)
248        for s_req_key in s_req_keys:
249            s_dut.droid.connectivityUnregisterNetworkCallback(s_req_key)
250        p_dut.droid.wifiAwareDestroyAll()
251        s_dut.droid.wifiAwareDestroyAll()
252
253        self.log.info("Stopping packet capture")
254        wutils.stop_pcap(self.packet_capture, pcap_procs, False)
255
256        self.verify_mac_no_leakage(pcap_procs, factory_mac_addresses, mac_addresses)
257