1#!/usr/bin/python3.4 2# 3# Copyright 2019 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from acts import asserts 18from acts.test_decorators import test_tracker_info 19from acts.test_utils.net import connectivity_const as cconsts 20from acts.test_utils.wifi.aware import aware_const as aconsts 21from acts.controllers.ap_lib.hostapd_constants import BAND_2G 22from acts.controllers.ap_lib.hostapd_constants import BAND_5G 23from acts.test_utils.wifi.aware import aware_test_utils as autils 24from acts.test_utils.wifi import wifi_test_utils as wutils 25from acts.test_utils.wifi.aware.AwareBaseTest import AwareBaseTest 26from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest 27from scapy.all import * 28 29 30class MacRandomNoLeakageTest(AwareBaseTest, WifiBaseTest): 31 """Set of tests for Wi-Fi Aware MAC address randomization of NMI (NAN 32 management interface) and NDI (NAN data interface).""" 33 34 SERVICE_NAME = "GoogleTestServiceXYZ" 35 ping_msg = 'PING' 36 37 AWARE_DEFAULT_CHANNEL_5_BAND = 149 38 AWARE_DEFAULT_CHANNEL_24_BAND = 6 39 40 ENCR_TYPE_OPEN = 0 41 ENCR_TYPE_PASSPHRASE = 1 42 ENCR_TYPE_PMK = 2 43 44 PASSPHRASE = "This is some random passphrase - very very secure!!" 45 PMK = "ODU0YjE3YzdmNDJiNWI4NTQ2NDJjNDI3M2VkZTQyZGU=" 46 47 def __init__(self, controllers): 48 WifiBaseTest.__init__(self, controllers) 49 AwareBaseTest.__init__(self, controllers) 50 51 def setup_class(self): 52 asserts.assert_true(hasattr(self, 'packet_capture'), 53 "Needs packet_capture attribute to support sniffing.") 54 self.configure_packet_capture(channel_5g=self.AWARE_DEFAULT_CHANNEL_5_BAND, 55 channel_2g=self.AWARE_DEFAULT_CHANNEL_24_BAND) 56 57 def setup_test(self): 58 WifiBaseTest.setup_test(self) 59 AwareBaseTest.setup_test(self) 60 61 def teardown_test(self): 62 WifiBaseTest.teardown_test(self) 63 AwareBaseTest.teardown_test(self) 64 65 def verify_mac_no_leakage(self, pcap_procs, factory_mac_addresses, mac_addresses): 66 # Get 2G and 5G pcaps 67 pcap_fname = '%s_%s.pcap' % (pcap_procs[BAND_5G][1], BAND_5G.upper()) 68 pcap_5g = rdpcap(pcap_fname) 69 70 pcap_fname = '%s_%s.pcap' % (pcap_procs[BAND_2G][1], BAND_2G.upper()) 71 pcap_2g = rdpcap(pcap_fname) 72 pcaps = pcap_5g + pcap_2g 73 74 # Verify factory MAC is not leaked in both 2G and 5G pcaps 75 for mac in factory_mac_addresses: 76 wutils.verify_mac_not_found_in_pcap(mac, pcaps) 77 78 # Verify random MACs are being used and in pcaps 79 for mac in mac_addresses: 80 wutils.verify_mac_is_found_in_pcap(mac, pcaps) 81 82 def transfer_mac_format(self, mac): 83 """add ':' to mac String, and transfer to lower case 84 85 Args: 86 mac: String of mac without ':' 87 @return: Lower case String of mac like "xx:xx:xx:xx:xx:xx" 88 """ 89 return re.sub(r"(?<=\w)(?=(?:\w\w)+$)", ":", mac.lower()) 90 91 def start_aware(self, dut, is_publish): 92 """Start Aware attach, then start Publish/Subscribe based on role 93 94 Args: 95 dut: Aware device 96 is_publish: True for Publisher, False for subscriber 97 @:return: dict with Aware discovery session info 98 """ 99 aware_id = dut.droid.wifiAwareAttach(True) 100 autils.wait_for_event(dut, aconsts.EVENT_CB_ON_ATTACHED) 101 event = autils.wait_for_event(dut, aconsts.EVENT_CB_ON_IDENTITY_CHANGED) 102 mac = self.transfer_mac_format(event["data"]["mac"]) 103 dut.log.info("NMI=%s", mac) 104 105 if is_publish: 106 config = autils.create_discovery_config(self.SERVICE_NAME, 107 aconsts.PUBLISH_TYPE_UNSOLICITED) 108 disc_id = dut.droid.wifiAwarePublish(aware_id, config) 109 autils.wait_for_event(dut, aconsts.SESSION_CB_ON_PUBLISH_STARTED) 110 else: 111 config = autils.create_discovery_config(self.SERVICE_NAME, 112 aconsts.SUBSCRIBE_TYPE_PASSIVE) 113 disc_id = dut.droid.wifiAwareSubscribe(aware_id, config) 114 autils.wait_for_event(dut, aconsts.SESSION_CB_ON_SUBSCRIBE_STARTED) 115 aware_session = {"awareId": aware_id, "discId": disc_id, "mac": mac} 116 return aware_session 117 118 def create_date_path(self, p_dut, pub_session, s_dut, sub_session, sec_type): 119 """Create NDP based on the security type(open, PMK, PASSPHRASE), run socket connect 120 121 Args: 122 p_dut: Publish device 123 p_disc_id: Publisher discovery id 124 peer_id_on_pub: peer id on publisher 125 s_dut: Subscribe device 126 s_disc_id: Subscriber discovery id 127 peer_id_on_sub: peer id on subscriber 128 sec_type: NDP security type(open, PMK or PASSPHRASE) 129 @:return: dict with NDP info 130 """ 131 132 passphrase = None 133 pmk = None 134 135 if sec_type == self.ENCR_TYPE_PASSPHRASE: 136 passphrase = self.PASSPHRASE 137 if sec_type == self.ENCR_TYPE_PMK: 138 pmk = self.PMK 139 140 p_req_key = autils.request_network( 141 p_dut, 142 p_dut.droid.wifiAwareCreateNetworkSpecifier(pub_session["discId"], 143 None, 144 passphrase, pmk)) 145 s_req_key = autils.request_network( 146 s_dut, 147 s_dut.droid.wifiAwareCreateNetworkSpecifier(sub_session["discId"], 148 sub_session["peerId"], 149 passphrase, pmk)) 150 151 p_net_event_nc = autils.wait_for_event_with_keys( 152 p_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, 153 (cconsts.NETWORK_CB_KEY_EVENT, 154 cconsts.NETWORK_CB_CAPABILITIES_CHANGED), 155 (cconsts.NETWORK_CB_KEY_ID, p_req_key)) 156 s_net_event_nc = autils.wait_for_event_with_keys( 157 s_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, 158 (cconsts.NETWORK_CB_KEY_EVENT, 159 cconsts.NETWORK_CB_CAPABILITIES_CHANGED), 160 (cconsts.NETWORK_CB_KEY_ID, s_req_key)) 161 p_net_event_lp = autils.wait_for_event_with_keys( 162 p_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, 163 (cconsts.NETWORK_CB_KEY_EVENT, 164 cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED), 165 (cconsts.NETWORK_CB_KEY_ID, p_req_key)) 166 s_net_event_lp = autils.wait_for_event_with_keys( 167 s_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, 168 (cconsts.NETWORK_CB_KEY_EVENT, 169 cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED), 170 (cconsts.NETWORK_CB_KEY_ID, s_req_key)) 171 172 p_aware_if = p_net_event_lp["data"][cconsts.NETWORK_CB_KEY_INTERFACE_NAME] 173 s_aware_if = s_net_event_lp["data"][cconsts.NETWORK_CB_KEY_INTERFACE_NAME] 174 p_if_mac = self.transfer_mac_format(autils.get_mac_addr(p_dut, p_aware_if)) 175 p_dut.log.info("NDI %s=%s", p_aware_if, p_if_mac) 176 s_if_mac = self.transfer_mac_format(autils.get_mac_addr(s_dut, s_aware_if)) 177 s_dut.log.info("NDI %s=%s", s_aware_if, s_if_mac) 178 179 s_ipv6 = p_net_event_nc["data"][aconsts.NET_CAP_IPV6] 180 p_ipv6 = s_net_event_nc["data"][aconsts.NET_CAP_IPV6] 181 asserts.assert_true( 182 autils.verify_socket_connect(p_dut, s_dut, p_ipv6, s_ipv6, 0), 183 "Failed socket link with Pub as Server") 184 asserts.assert_true( 185 autils.verify_socket_connect(s_dut, p_dut, s_ipv6, p_ipv6, 0), 186 "Failed socket link with Sub as Server") 187 188 ndp_info = {"pubReqKey": p_req_key, "pubIfMac": p_if_mac, 189 "subReqKey": s_req_key, "subIfMac": s_if_mac} 190 return ndp_info 191 192 @test_tracker_info(uuid="c9c66873-a8e0-4830-8baa-ada03223bcef") 193 def test_ib_multi_data_path_mac_random_test(self): 194 """Verify there is no factory MAC Address leakage during the Aware discovery, NDP creation, 195 socket setup and IP service connection.""" 196 197 p_dut = self.android_devices[0] 198 s_dut = self.android_devices[1] 199 mac_addresses = [] 200 factory_mac_addresses = [] 201 sec_types = [self.ENCR_TYPE_PMK, self.ENCR_TYPE_PASSPHRASE] 202 203 self.log.info("Starting packet capture") 204 pcap_procs = wutils.start_pcap( 205 self.packet_capture, 'dual', self.test_name) 206 207 factory_mac_1 = p_dut.droid.wifigetFactorymacAddresses()[0] 208 p_dut.log.info("Factory Address: %s", factory_mac_1) 209 factory_mac_2 = s_dut.droid.wifigetFactorymacAddresses()[0] 210 s_dut.log.info("Factory Address: %s", factory_mac_2) 211 factory_mac_addresses.append(factory_mac_1) 212 factory_mac_addresses.append(factory_mac_2) 213 214 # Start Aware and exchange messages 215 publish_session = self.start_aware(p_dut, True) 216 subscribe_session = self.start_aware(s_dut, False) 217 mac_addresses.append(publish_session["mac"]) 218 mac_addresses.append(subscribe_session["mac"]) 219 discovery_event = autils.wait_for_event(s_dut, aconsts.SESSION_CB_ON_SERVICE_DISCOVERED) 220 subscribe_session["peerId"] = discovery_event['data'][aconsts.SESSION_CB_KEY_PEER_ID] 221 222 msg_id = self.get_next_msg_id() 223 s_dut.droid.wifiAwareSendMessage(subscribe_session["discId"], subscribe_session["peerId"], 224 msg_id, self.ping_msg, aconsts.MAX_TX_RETRIES) 225 autils.wait_for_event(s_dut, aconsts.SESSION_CB_ON_MESSAGE_SENT) 226 pub_rx_msg_event = autils.wait_for_event(p_dut, aconsts.SESSION_CB_ON_MESSAGE_RECEIVED) 227 publish_session["peerId"] = pub_rx_msg_event['data'][aconsts.SESSION_CB_KEY_PEER_ID] 228 229 msg_id = self.get_next_msg_id() 230 p_dut.droid.wifiAwareSendMessage(publish_session["discId"], publish_session["peerId"], 231 msg_id, self.ping_msg, aconsts.MAX_TX_RETRIES) 232 autils.wait_for_event(p_dut, aconsts.SESSION_CB_ON_MESSAGE_SENT) 233 autils.wait_for_event(s_dut, aconsts.SESSION_CB_ON_MESSAGE_RECEIVED) 234 235 # Create Aware NDP 236 p_req_keys = [] 237 s_req_keys = [] 238 for sec in sec_types: 239 ndp_info = self.create_date_path(p_dut, publish_session, s_dut, subscribe_session, sec) 240 p_req_keys.append(ndp_info["pubReqKey"]) 241 s_req_keys.append(ndp_info["subReqKey"]) 242 mac_addresses.append(ndp_info["pubIfMac"]) 243 mac_addresses.append(ndp_info["subIfMac"]) 244 245 # clean-up 246 for p_req_key in p_req_keys: 247 p_dut.droid.connectivityUnregisterNetworkCallback(p_req_key) 248 for s_req_key in s_req_keys: 249 s_dut.droid.connectivityUnregisterNetworkCallback(s_req_key) 250 p_dut.droid.wifiAwareDestroyAll() 251 s_dut.droid.wifiAwareDestroyAll() 252 253 self.log.info("Stopping packet capture") 254 wutils.stop_pcap(self.packet_capture, pcap_procs, False) 255 256 self.verify_mac_no_leakage(pcap_procs, factory_mac_addresses, mac_addresses) 257