#!/usr/bin/python3.4 # # Copyright 2017 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import time from acts import asserts from acts.test_decorators import test_tracker_info from acts_contrib.test_utils.net import connectivity_const as cconsts from acts_contrib.test_utils.wifi import wifi_test_utils as wutils from acts_contrib.test_utils.wifi.WifiBaseTest import WifiBaseTest from acts_contrib.test_utils.wifi.aware import aware_const as aconsts from acts_contrib.test_utils.wifi.aware import aware_test_utils as autils from acts_contrib.test_utils.wifi.aware.AwareBaseTest import AwareBaseTest class DataPathTest(AwareBaseTest): """Set of tests for Wi-Fi Aware data-path.""" # configuration parameters used by tests ENCR_TYPE_OPEN = 0 ENCR_TYPE_PASSPHRASE = 1 ENCR_TYPE_PMK = 2 PASSPHRASE = "This is some random passphrase - very very secure!!" PASSPHRASE_MIN = "01234567" PASSPHRASE_MAX = "012345678901234567890123456789012345678901234567890123456789012" PMK = "ODU0YjE3YzdmNDJiNWI4NTQ2NDJjNDI3M2VkZTQyZGU=" PASSPHRASE2 = "This is some random passphrase - very very secure - but diff!!" PMK2 = "MTIzNDU2Nzg5MDEyMzQ1Njc4OTAxMjM0NTY3ODkwMTI=" PING_MSG = "ping" # message re-transmit counter (increases reliability in open-environment) # Note: reliability of message transmission is tested elsewhere MSG_RETX_COUNT = 5 # hard-coded max value, internal API # number of second to 'reasonably' wait to make sure that devices synchronize # with each other - useful for OOB test cases, where the OOB discovery would # take some time WAIT_FOR_CLUSTER = 5 def create_config(self, dtype): """Create a base configuration based on input parameters. Args: dtype: Publish or Subscribe discovery type Returns: Discovery configuration object. """ config = {} config[aconsts.DISCOVERY_KEY_DISCOVERY_TYPE] = dtype config[ aconsts.DISCOVERY_KEY_SERVICE_NAME] = "GoogleTestServiceDataPath" return config def request_network(self, dut, ns): """Request a Wi-Fi Aware network. Args: dut: Device ns: Network specifier Returns: the request key """ network_req = {"TransportType": 5, "NetworkSpecifier": ns} return dut.droid.connectivityRequestWifiAwareNetwork(network_req) def set_up_discovery(self, ptype, stype, get_peer_id, pub_on_both=False, pub_on_both_same=True): """Set up discovery sessions and wait for service discovery. Args: ptype: Publish discovery type stype: Subscribe discovery type get_peer_id: Send a message across to get the peer's id pub_on_both: If True then set up a publisher on both devices. The second publisher isn't used (existing to test use-case). pub_on_both_same: If True then the second publish uses an identical service name, otherwise a different service name. """ p_dut = self.android_devices[0] p_dut.pretty_name = "Publisher" s_dut = self.android_devices[1] s_dut.pretty_name = "Subscriber" # Publisher+Subscriber: attach and wait for confirmation p_id = p_dut.droid.wifiAwareAttach() autils.wait_for_event(p_dut, aconsts.EVENT_CB_ON_ATTACHED) time.sleep(self.device_startup_offset) s_id = s_dut.droid.wifiAwareAttach() autils.wait_for_event(s_dut, aconsts.EVENT_CB_ON_ATTACHED) # Publisher: start publish and wait for confirmation p_disc_id = p_dut.droid.wifiAwarePublish(p_id, self.create_config(ptype)) autils.wait_for_event(p_dut, aconsts.SESSION_CB_ON_PUBLISH_STARTED) # Optionally set up a publish session on the Subscriber device if pub_on_both: p2_config = self.create_config(ptype) if not pub_on_both_same: p2_config[aconsts.DISCOVERY_KEY_SERVICE_NAME] = ( p2_config[aconsts.DISCOVERY_KEY_SERVICE_NAME] + "-XYZXYZ") s_dut.droid.wifiAwarePublish(s_id, p2_config) autils.wait_for_event(s_dut, aconsts.SESSION_CB_ON_PUBLISH_STARTED) # Subscriber: start subscribe and wait for confirmation s_disc_id = s_dut.droid.wifiAwareSubscribe(s_id, self.create_config(stype)) autils.wait_for_event(s_dut, aconsts.SESSION_CB_ON_SUBSCRIBE_STARTED) # Subscriber: wait for service discovery discovery_event = autils.wait_for_event( s_dut, aconsts.SESSION_CB_ON_SERVICE_DISCOVERED) peer_id_on_sub = discovery_event["data"][ aconsts.SESSION_CB_KEY_PEER_ID] peer_id_on_pub = None if get_peer_id: # only need message to receive peer ID # Subscriber: send message to peer (Publisher - so it knows our address) s_dut.droid.wifiAwareSendMessage(s_disc_id, peer_id_on_sub, self.get_next_msg_id(), self.PING_MSG, self.MSG_RETX_COUNT) autils.wait_for_event(s_dut, aconsts.SESSION_CB_ON_MESSAGE_SENT) # Publisher: wait for received message pub_rx_msg_event = autils.wait_for_event( p_dut, aconsts.SESSION_CB_ON_MESSAGE_RECEIVED) peer_id_on_pub = pub_rx_msg_event["data"][ aconsts.SESSION_CB_KEY_PEER_ID] return (p_dut, s_dut, p_id, s_id, p_disc_id, s_disc_id, peer_id_on_sub, peer_id_on_pub) def run_ib_data_path_test(self, ptype, stype, encr_type, use_peer_id, passphrase_to_use=None, pub_on_both=False, pub_on_both_same=True, expect_failure=False): """Runs the in-band data-path tests. Args: ptype: Publish discovery type stype: Subscribe discovery type encr_type: Encryption type, one of ENCR_TYPE_* use_peer_id: On Responder (publisher): True to use peer ID, False to accept any request passphrase_to_use: The passphrase to use if encr_type=ENCR_TYPE_PASSPHRASE If None then use self.PASSPHRASE pub_on_both: If True then set up a publisher on both devices. The second publisher isn't used (existing to test use-case). pub_on_both_same: If True then the second publish uses an identical service name, otherwise a different service name. expect_failure: If True then don't expect NDP formation, otherwise expect NDP setup to succeed. """ (p_dut, s_dut, p_id, s_id, p_disc_id, s_disc_id, peer_id_on_sub, peer_id_on_pub) = self.set_up_discovery( ptype, stype, use_peer_id, pub_on_both=pub_on_both, pub_on_both_same=pub_on_both_same) passphrase = None pmk = None if encr_type == self.ENCR_TYPE_PASSPHRASE: passphrase = (self.PASSPHRASE if passphrase_to_use == None else passphrase_to_use) elif encr_type == self.ENCR_TYPE_PMK: pmk = self.PMK port = 1234 transport_protocol = 6 # TCP/IP # Publisher: request network if encr_type == self.ENCR_TYPE_OPEN: p_req_key = self.request_network( p_dut, p_dut.droid.wifiAwareCreateNetworkSpecifier( p_disc_id, peer_id_on_pub if use_peer_id else None, passphrase, pmk)) else: p_req_key = self.request_network( p_dut, p_dut.droid.wifiAwareCreateNetworkSpecifier( p_disc_id, peer_id_on_pub if use_peer_id else None, passphrase, pmk, port, transport_protocol)) # Subscriber: request network s_req_key = self.request_network( s_dut, s_dut.droid.wifiAwareCreateNetworkSpecifier( s_disc_id, peer_id_on_sub, passphrase, pmk)) if expect_failure: # Publisher & Subscriber: expect unavailable callbacks autils.wait_for_event_with_keys( p_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_UNAVAILABLE)) autils.wait_for_event_with_keys( s_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_UNAVAILABLE)) else: # Publisher & Subscriber: wait for network formation p_net_event_nc = autils.wait_for_event_with_keys( p_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_CAPABILITIES_CHANGED), (cconsts.NETWORK_CB_KEY_ID, p_req_key)) s_net_event_nc = autils.wait_for_event_with_keys( s_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_CAPABILITIES_CHANGED), (cconsts.NETWORK_CB_KEY_ID, s_req_key)) # validate no leak of information asserts.assert_false( cconsts.NETWORK_CB_KEY_NETWORK_SPECIFIER in p_net_event_nc[ "data"], "Network specifier leak!") asserts.assert_false( cconsts.NETWORK_CB_KEY_NETWORK_SPECIFIER in s_net_event_nc[ "data"], "Network specifier leak!") # note that Pub <-> Sub since IPv6 are of peer's! s_ipv6 = p_net_event_nc["data"][aconsts.NET_CAP_IPV6] p_ipv6 = s_net_event_nc["data"][aconsts.NET_CAP_IPV6] self.verify_network_info( p_net_event_nc["data"], s_net_event_nc["data"], encr_type == self.ENCR_TYPE_OPEN, port, transport_protocol) p_net_event_lp = autils.wait_for_event_with_keys( p_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED), (cconsts.NETWORK_CB_KEY_ID, p_req_key)) s_net_event_lp = autils.wait_for_event_with_keys( s_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED), (cconsts.NETWORK_CB_KEY_ID, s_req_key)) p_aware_if = p_net_event_lp["data"][ cconsts.NETWORK_CB_KEY_INTERFACE_NAME] s_aware_if = s_net_event_lp["data"][ cconsts.NETWORK_CB_KEY_INTERFACE_NAME] self.log.info("Interface names: p=%s, s=%s", p_aware_if, s_aware_if) self.log.info("Interface addresses (IPv6): p=%s, s=%s", p_ipv6, s_ipv6) # open sockets to test connection asserts.assert_true( autils.verify_socket_connect(p_dut, s_dut, p_ipv6, s_ipv6, 0), "Failed socket link with Pub as Server") asserts.assert_true( autils.verify_socket_connect(s_dut, p_dut, s_ipv6, p_ipv6, 0), "Failed socket link with Sub as Server") # terminate sessions and wait for ON_LOST callbacks p_dut.droid.wifiAwareDestroy(p_id) s_dut.droid.wifiAwareDestroy(s_id) autils.wait_for_event_with_keys( p_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_LOST), (cconsts.NETWORK_CB_KEY_ID, p_req_key)) autils.wait_for_event_with_keys( s_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_LOST), (cconsts.NETWORK_CB_KEY_ID, s_req_key)) # clean-up p_dut.droid.connectivityUnregisterNetworkCallback(p_req_key) s_dut.droid.connectivityUnregisterNetworkCallback(s_req_key) def run_oob_data_path_test(self, encr_type, use_peer_id, setup_discovery_sessions=False, expect_failure=False): """Runs the out-of-band data-path tests. Args: encr_type: Encryption type, one of ENCR_TYPE_* use_peer_id: On Responder: True to use peer ID, False to accept any request setup_discovery_sessions: If True also set up a (spurious) discovery session (pub on both sides, sub on Responder side). Validates a corner case. expect_failure: If True then don't expect NDP formation, otherwise expect NDP setup to succeed. """ init_dut = self.android_devices[0] init_dut.pretty_name = "Initiator" resp_dut = self.android_devices[1] resp_dut.pretty_name = "Responder" # Initiator+Responder: attach and wait for confirmation & identity init_id = init_dut.droid.wifiAwareAttach(True) autils.wait_for_event(init_dut, aconsts.EVENT_CB_ON_ATTACHED) init_ident_event = autils.wait_for_event( init_dut, aconsts.EVENT_CB_ON_IDENTITY_CHANGED) init_mac = init_ident_event["data"]["mac"] time.sleep(self.device_startup_offset) resp_id = resp_dut.droid.wifiAwareAttach(True) autils.wait_for_event(resp_dut, aconsts.EVENT_CB_ON_ATTACHED) resp_ident_event = autils.wait_for_event( resp_dut, aconsts.EVENT_CB_ON_IDENTITY_CHANGED) resp_mac = resp_ident_event["data"]["mac"] # wait for for devices to synchronize with each other - there are no other # mechanisms to make sure this happens for OOB discovery (except retrying # to execute the data-path request) time.sleep(self.WAIT_FOR_CLUSTER) if setup_discovery_sessions: init_dut.droid.wifiAwarePublish( init_id, self.create_config(aconsts.PUBLISH_TYPE_UNSOLICITED)) autils.wait_for_event(init_dut, aconsts.SESSION_CB_ON_PUBLISH_STARTED) resp_dut.droid.wifiAwarePublish( resp_id, self.create_config(aconsts.PUBLISH_TYPE_UNSOLICITED)) autils.wait_for_event(resp_dut, aconsts.SESSION_CB_ON_PUBLISH_STARTED) resp_dut.droid.wifiAwareSubscribe( resp_id, self.create_config(aconsts.SUBSCRIBE_TYPE_PASSIVE)) autils.wait_for_event(resp_dut, aconsts.SESSION_CB_ON_SUBSCRIBE_STARTED) autils.wait_for_event(resp_dut, aconsts.SESSION_CB_ON_SERVICE_DISCOVERED) passphrase = None pmk = None if encr_type == self.ENCR_TYPE_PASSPHRASE: passphrase = self.PASSPHRASE elif encr_type == self.ENCR_TYPE_PMK: pmk = self.PMK # Responder: request network resp_req_key = self.request_network( resp_dut, resp_dut.droid.wifiAwareCreateNetworkSpecifierOob( resp_id, aconsts.DATA_PATH_RESPONDER, init_mac if use_peer_id else None, passphrase, pmk)) # Initiator: request network init_req_key = self.request_network( init_dut, init_dut.droid.wifiAwareCreateNetworkSpecifierOob( init_id, aconsts.DATA_PATH_INITIATOR, resp_mac, passphrase, pmk)) if expect_failure: # Initiator & Responder: expect unavailable callbacks autils.wait_for_event_with_keys( init_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_UNAVAILABLE)) autils.wait_for_event_with_keys( resp_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_UNAVAILABLE)) else: # Initiator & Responder: wait for network formation init_net_event_nc = autils.wait_for_event_with_keys( init_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_CAPABILITIES_CHANGED), (cconsts.NETWORK_CB_KEY_ID, init_req_key)) resp_net_event_nc = autils.wait_for_event_with_keys( resp_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_CAPABILITIES_CHANGED), (cconsts.NETWORK_CB_KEY_ID, resp_req_key)) # validate no leak of information asserts.assert_false( cconsts.NETWORK_CB_KEY_NETWORK_SPECIFIER in init_net_event_nc[ "data"], "Network specifier leak!") asserts.assert_false( cconsts.NETWORK_CB_KEY_NETWORK_SPECIFIER in resp_net_event_nc[ "data"], "Network specifier leak!") # note that Init <-> Resp since IPv6 are of peer's! init_ipv6 = resp_net_event_nc["data"][aconsts.NET_CAP_IPV6] resp_ipv6 = init_net_event_nc["data"][aconsts.NET_CAP_IPV6] init_net_event_lp = autils.wait_for_event_with_keys( init_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED), (cconsts.NETWORK_CB_KEY_ID, init_req_key)) resp_net_event_lp = autils.wait_for_event_with_keys( resp_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED), (cconsts.NETWORK_CB_KEY_ID, resp_req_key)) init_aware_if = init_net_event_lp["data"][ cconsts.NETWORK_CB_KEY_INTERFACE_NAME] resp_aware_if = resp_net_event_lp["data"][ cconsts.NETWORK_CB_KEY_INTERFACE_NAME] self.log.info("Interface names: I=%s, R=%s", init_aware_if, resp_aware_if) self.log.info("Interface addresses (IPv6): I=%s, R=%s", init_ipv6, resp_ipv6) # open sockets to test connection asserts.assert_true( autils.verify_socket_connect(init_dut, resp_dut, init_ipv6, resp_ipv6, 0), "Failed socket link with Initiator as Server") asserts.assert_true( autils.verify_socket_connect(resp_dut, init_dut, resp_ipv6, init_ipv6, 0), "Failed socket link with Responder as Server") # terminate sessions and wait for ON_LOST callbacks init_dut.droid.wifiAwareDestroy(init_id) resp_dut.droid.wifiAwareDestroy(resp_id) autils.wait_for_event_with_keys( init_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_LOST), (cconsts.NETWORK_CB_KEY_ID, init_req_key)) autils.wait_for_event_with_keys( resp_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_LOST), (cconsts.NETWORK_CB_KEY_ID, resp_req_key)) # clean-up resp_dut.droid.connectivityUnregisterNetworkCallback(resp_req_key) init_dut.droid.connectivityUnregisterNetworkCallback(init_req_key) def run_mismatched_ib_data_path_test(self, pub_mismatch, sub_mismatch): """Runs the negative in-band data-path tests: mismatched peer ID. Args: pub_mismatch: Mismatch the publisher's ID sub_mismatch: Mismatch the subscriber's ID """ (p_dut, s_dut, p_id, s_id, p_disc_id, s_disc_id, peer_id_on_sub, peer_id_on_pub) = self.set_up_discovery( aconsts.PUBLISH_TYPE_UNSOLICITED, aconsts.SUBSCRIBE_TYPE_PASSIVE, True) if pub_mismatch: peer_id_on_pub = peer_id_on_pub - 1 if sub_mismatch: peer_id_on_sub = peer_id_on_sub - 1 # Publisher: request network p_req_key = self.request_network( p_dut, p_dut.droid.wifiAwareCreateNetworkSpecifier( p_disc_id, peer_id_on_pub, None)) # Subscriber: request network s_req_key = self.request_network( s_dut, s_dut.droid.wifiAwareCreateNetworkSpecifier( s_disc_id, peer_id_on_sub, None)) # Publisher & Subscriber: # - expect unavailable callbacks on the party with the bad ID # - also expect unavailable on the Initiator party (i.e. the # Subscriber) if the Publisher has a bad ID # - but a Publisher with a valid ID will keep waiting ... autils.wait_for_event_with_keys( s_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_UNAVAILABLE)) if pub_mismatch: autils.wait_for_event_with_keys( p_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_UNAVAILABLE)) else: time.sleep(autils.EVENT_NDP_TIMEOUT) autils.fail_on_event_with_keys( p_dut, cconsts.EVENT_NETWORK_CALLBACK, 0, (cconsts.NETWORK_CB_KEY_ID, p_req_key)) # clean-up p_dut.droid.connectivityUnregisterNetworkCallback(p_req_key) s_dut.droid.connectivityUnregisterNetworkCallback(s_req_key) def run_mismatched_oob_data_path_test(self, init_mismatch_mac=False, resp_mismatch_mac=False, init_encr_type=ENCR_TYPE_OPEN, resp_encr_type=ENCR_TYPE_OPEN): """Runs the negative out-of-band data-path tests: mismatched information between Responder and Initiator. Args: init_mismatch_mac: True to mismatch the Initiator MAC address resp_mismatch_mac: True to mismatch the Responder MAC address init_encr_type: Encryption type of Initiator - ENCR_TYPE_* resp_encr_type: Encryption type of Responder - ENCR_TYPE_* """ init_dut = self.android_devices[0] init_dut.pretty_name = "Initiator" resp_dut = self.android_devices[1] resp_dut.pretty_name = "Responder" # Initiator+Responder: attach and wait for confirmation & identity init_id = init_dut.droid.wifiAwareAttach(True) autils.wait_for_event(init_dut, aconsts.EVENT_CB_ON_ATTACHED) init_ident_event = autils.wait_for_event( init_dut, aconsts.EVENT_CB_ON_IDENTITY_CHANGED) init_mac = init_ident_event["data"]["mac"] time.sleep(self.device_startup_offset) resp_id = resp_dut.droid.wifiAwareAttach(True) autils.wait_for_event(resp_dut, aconsts.EVENT_CB_ON_ATTACHED) resp_ident_event = autils.wait_for_event( resp_dut, aconsts.EVENT_CB_ON_IDENTITY_CHANGED) resp_mac = resp_ident_event["data"]["mac"] if init_mismatch_mac: # assumes legit ones don't start with "00" init_mac = "00" + init_mac[2:] if resp_mismatch_mac: resp_mac = "00" + resp_mac[2:] # wait for for devices to synchronize with each other - there are no other # mechanisms to make sure this happens for OOB discovery (except retrying # to execute the data-path request) time.sleep(self.WAIT_FOR_CLUSTER) # set up separate keys: even if types are the same we want a mismatch init_passphrase = None init_pmk = None if init_encr_type == self.ENCR_TYPE_PASSPHRASE: init_passphrase = self.PASSPHRASE elif init_encr_type == self.ENCR_TYPE_PMK: init_pmk = self.PMK resp_passphrase = None resp_pmk = None if resp_encr_type == self.ENCR_TYPE_PASSPHRASE: resp_passphrase = self.PASSPHRASE2 elif resp_encr_type == self.ENCR_TYPE_PMK: resp_pmk = self.PMK2 # Responder: request network resp_req_key = self.request_network( resp_dut, resp_dut.droid.wifiAwareCreateNetworkSpecifierOob( resp_id, aconsts.DATA_PATH_RESPONDER, init_mac, resp_passphrase, resp_pmk)) # Initiator: request network init_req_key = self.request_network( init_dut, init_dut.droid.wifiAwareCreateNetworkSpecifierOob( init_id, aconsts.DATA_PATH_INITIATOR, resp_mac, init_passphrase, init_pmk)) # Initiator & Responder: # - expect unavailable on the Initiator party if the # Initiator and Responder with mac or encryption mismatch # - For responder: # - If mac mismatch, responder will keep waiting ... # - If encryption mismatch, responder expect unavailable autils.wait_for_event_with_keys( init_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_UNAVAILABLE)) time.sleep(autils.EVENT_NDP_TIMEOUT) if init_mismatch_mac or resp_mismatch_mac: autils.fail_on_event_with_keys( resp_dut, cconsts.EVENT_NETWORK_CALLBACK, 0, (cconsts.NETWORK_CB_KEY_ID, resp_req_key)) else: autils.wait_for_event_with_keys( resp_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_UNAVAILABLE)) # clean-up resp_dut.droid.connectivityUnregisterNetworkCallback(resp_req_key) init_dut.droid.connectivityUnregisterNetworkCallback(init_req_key) def verify_network_info(self, p_data, s_data, open, port, transport_protocol): """Verify that the port and transport protocol information is correct. - should only exist on subscriber (received from publisher) and match transmitted values - should only exist on an encrypted NDP Args: p_data, s_data: Pub and Sub (respectively) net cap event data. open: True if NDP unencrypted, False if encrypted. port: Expected port value. transport_protocol: Expected transport protocol value. """ asserts.assert_true(aconsts.NET_CAP_PORT not in p_data, "port info not expected on Pub") asserts.assert_true(aconsts.NET_CAP_TRANSPORT_PROTOCOL not in p_data, "transport protocol info not expected on Pub") if open: asserts.assert_true(aconsts.NET_CAP_PORT not in s_data, "port info not expected on Sub (open NDP)") asserts.assert_true( aconsts.NET_CAP_TRANSPORT_PROTOCOL not in s_data, "transport protocol info not expected on Sub (open NDP)") else: asserts.assert_equal(s_data[aconsts.NET_CAP_PORT], port, "Port info does not match on Sub (from Pub)") asserts.assert_equal( s_data[aconsts.NET_CAP_TRANSPORT_PROTOCOL], transport_protocol, "Transport protocol info does not match on Sub (from Pub)") ####################################### # Positive In-Band (IB) tests key: # # names is: test_ib____ # where: # # pub_type: Type of publish discovery session: unsolicited or solicited. # sub_type: Type of subscribe discovery session: passive or active. # encr_type: Encription type: open, passphrase # peer_spec: Peer specification method: any or specific # # Note: In-Band means using Wi-Fi Aware for discovery and referring to the # peer using the Aware-provided peer handle (as opposed to a MAC address). ####################################### @test_tracker_info(uuid="fa30bedc-d1de-4440-bf25-ec00d10555af") @WifiBaseTest.wifi_test_wrap def test_ib_unsolicited_passive_open_specific(self): """Data-path: in-band, unsolicited/passive, open encryption, specific peer Verifies end-to-end discovery + data-path creation. """ self.run_ib_data_path_test( ptype=aconsts.PUBLISH_TYPE_UNSOLICITED, stype=aconsts.SUBSCRIBE_TYPE_PASSIVE, encr_type=self.ENCR_TYPE_OPEN, use_peer_id=True) @test_tracker_info(uuid="57fc9d53-32ae-470f-a8b1-2fe37893687d") @WifiBaseTest.wifi_test_wrap def test_ib_unsolicited_passive_open_any(self): """Data-path: in-band, unsolicited/passive, open encryption, any peer Verifies end-to-end discovery + data-path creation. """ self.run_ib_data_path_test( ptype=aconsts.PUBLISH_TYPE_UNSOLICITED, stype=aconsts.SUBSCRIBE_TYPE_PASSIVE, encr_type=self.ENCR_TYPE_OPEN, use_peer_id=False) @test_tracker_info(uuid="93b2a23d-8579-448a-936c-7812929464cf") @WifiBaseTest.wifi_test_wrap def test_ib_unsolicited_passive_passphrase_specific(self): """Data-path: in-band, unsolicited/passive, passphrase, specific peer Verifies end-to-end discovery + data-path creation. """ self.run_ib_data_path_test( ptype=aconsts.PUBLISH_TYPE_UNSOLICITED, stype=aconsts.SUBSCRIBE_TYPE_PASSIVE, encr_type=self.ENCR_TYPE_PASSPHRASE, use_peer_id=True) @test_tracker_info(uuid="1736126f-a0ff-4712-acc4-f89b4eef5716") @WifiBaseTest.wifi_test_wrap def test_ib_unsolicited_passive_passphrase_any(self): """Data-path: in-band, unsolicited/passive, passphrase, any peer Verifies end-to-end discovery + data-path creation. """ self.run_ib_data_path_test( ptype=aconsts.PUBLISH_TYPE_UNSOLICITED, stype=aconsts.SUBSCRIBE_TYPE_PASSIVE, encr_type=self.ENCR_TYPE_PASSPHRASE, use_peer_id=False) @test_tracker_info(uuid="b9353d5b-3f77-46bf-bfd9-65d56a7c939a") @WifiBaseTest.wifi_test_wrap def test_ib_unsolicited_passive_pmk_specific(self): """Data-path: in-band, unsolicited/passive, PMK, specific peer Verifies end-to-end discovery + data-path creation. """ self.run_ib_data_path_test( ptype=aconsts.PUBLISH_TYPE_UNSOLICITED, stype=aconsts.SUBSCRIBE_TYPE_PASSIVE, encr_type=self.ENCR_TYPE_PMK, use_peer_id=True) @test_tracker_info(uuid="06f3b2ab-4a10-4398-83a4-6a23851b1662") @WifiBaseTest.wifi_test_wrap def test_ib_unsolicited_passive_pmk_any(self): """Data-path: in-band, unsolicited/passive, PMK, any peer Verifies end-to-end discovery + data-path creation. """ self.run_ib_data_path_test( ptype=aconsts.PUBLISH_TYPE_UNSOLICITED, stype=aconsts.SUBSCRIBE_TYPE_PASSIVE, encr_type=self.ENCR_TYPE_PMK, use_peer_id=False) @test_tracker_info(uuid="0ed7d8b3-a69e-46ba-aeb7-13e507ecf290") @WifiBaseTest.wifi_test_wrap def test_ib_solicited_active_open_specific(self): """Data-path: in-band, solicited/active, open encryption, specific peer Verifies end-to-end discovery + data-path creation. """ self.run_ib_data_path_test( ptype=aconsts.PUBLISH_TYPE_SOLICITED, stype=aconsts.SUBSCRIBE_TYPE_ACTIVE, encr_type=self.ENCR_TYPE_OPEN, use_peer_id=True) @test_tracker_info(uuid="c7ba6d28-5ef6-45d9-95d5-583ad6d981f3") @WifiBaseTest.wifi_test_wrap def test_ib_solicited_active_open_any(self): """Data-path: in-band, solicited/active, open encryption, any peer Verifies end-to-end discovery + data-path creation. """ self.run_ib_data_path_test( ptype=aconsts.PUBLISH_TYPE_SOLICITED, stype=aconsts.SUBSCRIBE_TYPE_ACTIVE, encr_type=self.ENCR_TYPE_OPEN, use_peer_id=False) @test_tracker_info(uuid="388cea99-0e2e-49ea-b00e-f3e56b6236e5") @WifiBaseTest.wifi_test_wrap def test_ib_solicited_active_passphrase_specific(self): """Data-path: in-band, solicited/active, passphrase, specific peer Verifies end-to-end discovery + data-path creation. """ self.run_ib_data_path_test( ptype=aconsts.PUBLISH_TYPE_SOLICITED, stype=aconsts.SUBSCRIBE_TYPE_ACTIVE, encr_type=self.ENCR_TYPE_PASSPHRASE, use_peer_id=True) @test_tracker_info(uuid="fcd3e28a-5eab-4169-8a0c-dc7204dcdc13") @WifiBaseTest.wifi_test_wrap def test_ib_solicited_active_passphrase_any(self): """Data-path: in-band, solicited/active, passphrase, any peer Verifies end-to-end discovery + data-path creation. """ self.run_ib_data_path_test( ptype=aconsts.PUBLISH_TYPE_SOLICITED, stype=aconsts.SUBSCRIBE_TYPE_ACTIVE, encr_type=self.ENCR_TYPE_PASSPHRASE, use_peer_id=False) @test_tracker_info(uuid="9d4eaad7-ba53-4a06-8ce0-e308daea3309") @WifiBaseTest.wifi_test_wrap def test_ib_solicited_active_pmk_specific(self): """Data-path: in-band, solicited/active, PMK, specific peer Verifies end-to-end discovery + data-path creation. """ self.run_ib_data_path_test( ptype=aconsts.PUBLISH_TYPE_SOLICITED, stype=aconsts.SUBSCRIBE_TYPE_ACTIVE, encr_type=self.ENCR_TYPE_PMK, use_peer_id=True) @test_tracker_info(uuid="129d850e-c312-4137-a67b-05ae95fe66cc") @WifiBaseTest.wifi_test_wrap def test_ib_solicited_active_pmk_any(self): """Data-path: in-band, solicited/active, PMK, any peer Verifies end-to-end discovery + data-path creation. """ self.run_ib_data_path_test( ptype=aconsts.PUBLISH_TYPE_SOLICITED, stype=aconsts.SUBSCRIBE_TYPE_ACTIVE, encr_type=self.ENCR_TYPE_PMK, use_peer_id=False) ####################################### # Positive In-Band (IB) with a publish session running on the subscriber # tests key: # # names is: test_ib_extra_pub___ # __ # where: # # same|diff: Whether the extra publish session (on the subscriber) is the same # or different from the primary session. # pub_type: Type of publish discovery session: unsolicited or solicited. # sub_type: Type of subscribe discovery session: passive or active. # encr_type: Encryption type: open, passphrase # peer_spec: Peer specification method: any or specific # # Note: In-Band means using Wi-Fi Aware for discovery and referring to the # peer using the Aware-provided peer handle (as opposed to a MAC address). ####################################### @test_tracker_info(uuid="e855dd81-45c8-4bb2-a204-7687c48ff843") @WifiBaseTest.wifi_test_wrap def test_ib_extra_pub_same_unsolicited_passive_open_specific(self): """Data-path: in-band, unsolicited/passive, open encryption, specific peer. Configuration contains a publisher (for the same service) running on *both* devices. Verifies end-to-end discovery + data-path creation. """ self.run_ib_data_path_test( ptype=aconsts.PUBLISH_TYPE_UNSOLICITED, stype=aconsts.SUBSCRIBE_TYPE_PASSIVE, encr_type=self.ENCR_TYPE_OPEN, use_peer_id=True, pub_on_both=True, pub_on_both_same=True) @test_tracker_info(uuid="228ea657-82e6-44bc-8369-a2c719a5e252") @WifiBaseTest.wifi_test_wrap def test_ib_extra_pub_same_unsolicited_passive_open_any(self): """Data-path: in-band, unsolicited/passive, open encryption, any peer. Configuration contains a publisher (for the same service) running on *both* devices. Verifies end-to-end discovery + data-path creation. """ self.run_ib_data_path_test( ptype=aconsts.PUBLISH_TYPE_UNSOLICITED, stype=aconsts.SUBSCRIBE_TYPE_PASSIVE, encr_type=self.ENCR_TYPE_OPEN, use_peer_id=False, pub_on_both=True, pub_on_both_same=True) @test_tracker_info(uuid="7a32f439-d745-4716-a75e-b54109aaaf82") @WifiBaseTest.wifi_test_wrap def test_ib_extra_pub_diff_unsolicited_passive_open_specific(self): """Data-path: in-band, unsolicited/passive, open encryption, specific peer. Configuration contains a publisher (for a different service) running on *both* devices. Verifies end-to-end discovery + data-path creation. """ self.run_ib_data_path_test( ptype=aconsts.PUBLISH_TYPE_UNSOLICITED, stype=aconsts.SUBSCRIBE_TYPE_PASSIVE, encr_type=self.ENCR_TYPE_OPEN, use_peer_id=True, pub_on_both=True, pub_on_both_same=False) @test_tracker_info(uuid="a14ddc66-88fd-4b49-ab37-225533867c63") @WifiBaseTest.wifi_test_wrap def test_ib_extra_pub_diff_unsolicited_passive_open_any(self): """Data-path: in-band, unsolicited/passive, open encryption, any peer. Configuration contains a publisher (for a different service) running on *both* devices. Verifies end-to-end discovery + data-path creation. """ self.run_ib_data_path_test( ptype=aconsts.PUBLISH_TYPE_UNSOLICITED, stype=aconsts.SUBSCRIBE_TYPE_PASSIVE, encr_type=self.ENCR_TYPE_OPEN, use_peer_id=False, pub_on_both=True, pub_on_both_same=False) ####################################### # Positive Out-of-Band (OOB) tests key: # # names is: test_oob__ # where: # # encr_type: Encryption type: open, passphrase # peer_spec: Peer specification method: any or specific # # Optionally set up an extra discovery session to test coexistence. If so # add "ib_coex" to test name. # # Note: Out-of-Band means using a non-Wi-Fi Aware mechanism for discovery and # exchange of MAC addresses and then Wi-Fi Aware for data-path. ####################################### @test_tracker_info(uuid="7db17d8c-1dce-4084-b695-215bbcfe7d41") @WifiBaseTest.wifi_test_wrap def test_oob_open_specific(self): """Data-path: out-of-band, open encryption, specific peer Verifies end-to-end discovery + data-path creation. """ self.run_oob_data_path_test( encr_type=self.ENCR_TYPE_OPEN, use_peer_id=True) @test_tracker_info(uuid="ad416d89-cb95-4a07-8d29-ee213117450b") @WifiBaseTest.wifi_test_wrap def test_oob_open_any(self): """Data-path: out-of-band, open encryption, any peer Verifies end-to-end discovery + data-path creation. """ self.run_oob_data_path_test( encr_type=self.ENCR_TYPE_OPEN, use_peer_id=False) @test_tracker_info(uuid="74937a3a-d524-43e2-8979-4449271cab52") @WifiBaseTest.wifi_test_wrap def test_oob_passphrase_specific(self): """Data-path: out-of-band, passphrase, specific peer Verifies end-to-end discovery + data-path creation. """ self.run_oob_data_path_test( encr_type=self.ENCR_TYPE_PASSPHRASE, use_peer_id=True) @test_tracker_info(uuid="afcbdc7e-d3a9-465b-b1da-ce2e42e3941e") @WifiBaseTest.wifi_test_wrap def test_oob_passphrase_any(self): """Data-path: out-of-band, passphrase, any peer Verifies end-to-end discovery + data-path creation. """ self.run_oob_data_path_test( encr_type=self.ENCR_TYPE_PASSPHRASE, use_peer_id=False) @test_tracker_info(uuid="0d095031-160a-4537-aab5-41b6ad5d55f8") @WifiBaseTest.wifi_test_wrap def test_oob_pmk_specific(self): """Data-path: out-of-band, PMK, specific peer Verifies end-to-end discovery + data-path creation. """ self.run_oob_data_path_test( encr_type=self.ENCR_TYPE_PMK, use_peer_id=True) @test_tracker_info(uuid="e45477bd-66cc-4eb7-88dd-4518c8aa2a74") @WifiBaseTest.wifi_test_wrap def test_oob_pmk_any(self): """Data-path: out-of-band, PMK, any peer Verifies end-to-end discovery + data-path creation. """ self.run_oob_data_path_test( encr_type=self.ENCR_TYPE_PMK, use_peer_id=False) @test_tracker_info(uuid="dd464f24-b404-4eea-955c-d10c9e8adefc") @WifiBaseTest.wifi_test_wrap def test_oob_ib_coex_open_specific(self): """Data-path: out-of-band, open encryption, specific peer - in-band coex: set up a concurrent discovery session to verify no impact. The session consists of Publisher on both ends, and a Subscriber on the Responder. Verifies end-to-end discovery + data-path creation. """ self.run_oob_data_path_test( encr_type=self.ENCR_TYPE_OPEN, use_peer_id=True, setup_discovery_sessions=True) @test_tracker_info(uuid="088fcd3a-b015-4179-a9a5-91f782b03e3b") @WifiBaseTest.wifi_test_wrap def test_oob_ib_coex_open_any(self): """Data-path: out-of-band, open encryption, any peer - in-band coex: set up a concurrent discovery session to verify no impact. The session consists of Publisher on both ends, and a Subscriber on the Responder. Verifies end-to-end discovery + data-path creation. """ self.run_oob_data_path_test( encr_type=self.ENCR_TYPE_OPEN, use_peer_id=False, setup_discovery_sessions=True) ############################################################## @test_tracker_info(uuid="1c2c9805-dc1e-43b5-a1b8-315e8c9a4337") @WifiBaseTest.wifi_test_wrap def test_passphrase_min(self): """Data-path: minimum passphrase length Use in-band, unsolicited/passive, any peer combination """ self.run_ib_data_path_test( ptype=aconsts.PUBLISH_TYPE_UNSOLICITED, stype=aconsts.SUBSCRIBE_TYPE_PASSIVE, encr_type=self.ENCR_TYPE_PASSPHRASE, use_peer_id=False, passphrase_to_use=self.PASSPHRASE_MIN) @test_tracker_info(uuid="e696e2b9-87a9-4521-b337-61b9efaa2057") @WifiBaseTest.wifi_test_wrap def test_passphrase_max(self): """Data-path: maximum passphrase length Use in-band, unsolicited/passive, any peer combination """ self.run_ib_data_path_test( ptype=aconsts.PUBLISH_TYPE_UNSOLICITED, stype=aconsts.SUBSCRIBE_TYPE_PASSIVE, encr_type=self.ENCR_TYPE_PASSPHRASE, use_peer_id=False, passphrase_to_use=self.PASSPHRASE_MAX) @test_tracker_info(uuid="533cd44c-ff30-4283-ac28-f71fd7b4f02d") @WifiBaseTest.wifi_test_wrap def test_negative_mismatch_publisher_peer_id(self): """Data-path: failure when publisher peer ID is mismatched""" self.run_mismatched_ib_data_path_test( pub_mismatch=True, sub_mismatch=False) @test_tracker_info(uuid="682f275e-722a-4f8b-85e7-0dcea9d25532") @WifiBaseTest.wifi_test_wrap def test_negative_mismatch_subscriber_peer_id(self): """Data-path: failure when subscriber peer ID is mismatched""" self.run_mismatched_ib_data_path_test( pub_mismatch=False, sub_mismatch=True) @test_tracker_info(uuid="7fa82796-7fc9-4d9e-bbbb-84b751788943") @WifiBaseTest.wifi_test_wrap def test_negative_mismatch_init_mac(self): """Data-path: failure when Initiator MAC address mismatch""" self.run_mismatched_oob_data_path_test( init_mismatch_mac=True, resp_mismatch_mac=False) @test_tracker_info(uuid="edeae959-4644-44f9-8d41-bdeb5216954e") @WifiBaseTest.wifi_test_wrap def test_negative_mismatch_resp_mac(self): """Data-path: failure when Responder MAC address mismatch""" self.run_mismatched_oob_data_path_test( init_mismatch_mac=False, resp_mismatch_mac=True) @test_tracker_info(uuid="91f46949-c47f-49f9-a90f-6fae699613a7") @WifiBaseTest.wifi_test_wrap def test_negative_mismatch_passphrase(self): """Data-path: failure when passphrases mismatch""" self.run_mismatched_oob_data_path_test( init_encr_type=self.ENCR_TYPE_PASSPHRASE, resp_encr_type=self.ENCR_TYPE_PASSPHRASE) @test_tracker_info(uuid="01c49c2e-dc92-4a27-bb47-c4fc67617c23") @WifiBaseTest.wifi_test_wrap def test_negative_mismatch_pmk(self): """Data-path: failure when PMK mismatch""" self.run_mismatched_oob_data_path_test( init_encr_type=self.ENCR_TYPE_PMK, resp_encr_type=self.ENCR_TYPE_PMK) @test_tracker_info(uuid="4d651797-5fbb-408e-a4b6-a6e1944136da") @WifiBaseTest.wifi_test_wrap def test_negative_mismatch_open_passphrase(self): """Data-path: failure when initiator is open, and responder passphrase""" self.run_mismatched_oob_data_path_test( init_encr_type=self.ENCR_TYPE_OPEN, resp_encr_type=self.ENCR_TYPE_PASSPHRASE) @test_tracker_info(uuid="1ae697f4-5987-4187-aeef-1e22d07d4a7c") @WifiBaseTest.wifi_test_wrap def test_negative_mismatch_open_pmk(self): """Data-path: failure when initiator is open, and responder PMK""" self.run_mismatched_oob_data_path_test( init_encr_type=self.ENCR_TYPE_OPEN, resp_encr_type=self.ENCR_TYPE_PMK) @test_tracker_info(uuid="f027b1cc-0e7a-4075-b880-5e64b288afbd") @WifiBaseTest.wifi_test_wrap def test_negative_mismatch_pmk_passphrase(self): """Data-path: failure when initiator is pmk, and responder passphrase""" self.run_mismatched_oob_data_path_test( init_encr_type=self.ENCR_TYPE_PMK, resp_encr_type=self.ENCR_TYPE_PASSPHRASE) @test_tracker_info(uuid="0819bbd4-72ae-49c4-bd46-5448db2b0a06") @WifiBaseTest.wifi_test_wrap def test_negative_mismatch_passphrase_open(self): """Data-path: failure when initiator is passphrase, and responder open""" self.run_mismatched_oob_data_path_test( init_encr_type=self.ENCR_TYPE_PASSPHRASE, resp_encr_type=self.ENCR_TYPE_OPEN) @test_tracker_info(uuid="7ef24f62-8e6b-4732-88a3-80a43584dda4") @WifiBaseTest.wifi_test_wrap def test_negative_mismatch_pmk_open(self): """Data-path: failure when initiator is PMK, and responder open""" self.run_mismatched_oob_data_path_test( init_encr_type=self.ENCR_TYPE_PMK, resp_encr_type=self.ENCR_TYPE_OPEN) @test_tracker_info(uuid="7b9c9efc-1c06-465e-8a5e-d6a22ac1da97") @WifiBaseTest.wifi_test_wrap def test_negative_mismatch_passphrase_pmk(self): """Data-path: failure when initiator is passphrase, and responder pmk""" self.run_mismatched_oob_data_path_test( init_encr_type=self.ENCR_TYPE_PASSPHRASE, resp_encr_type=self.ENCR_TYPE_OPEN) ########################################################################## def wait_for_request_responses(self, dut, req_keys, aware_ifs, aware_ipv6): """Wait for network request confirmation for all request keys. Args: dut: Device under test req_keys: (in) A list of the network requests aware_ifs: (out) A list into which to append the network interface aware_ipv6: (out) A list into which to append the network ipv6 address """ num_events = 0 # looking for 2 events per NDP: link-prop + net-cap while num_events != 2 * len(req_keys): event = autils.wait_for_event( dut, cconsts.EVENT_NETWORK_CALLBACK, timeout=autils.EVENT_NDP_TIMEOUT) if (event["data"][cconsts.NETWORK_CB_KEY_EVENT] == cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED): if event["data"][cconsts.NETWORK_CB_KEY_ID] in req_keys: num_events = num_events + 1 aware_ifs.append( event["data"][cconsts.NETWORK_CB_KEY_INTERFACE_NAME]) else: self.log.info( "Received an unexpected connectivity, the revoked " "network request probably went through -- %s", event) elif (event["data"][cconsts.NETWORK_CB_KEY_EVENT] == cconsts.NETWORK_CB_CAPABILITIES_CHANGED): if event["data"][cconsts.NETWORK_CB_KEY_ID] in req_keys: num_events = num_events + 1 aware_ipv6.append(event["data"][aconsts.NET_CAP_IPV6]) else: self.log.info( "Received an unexpected connectivity, the revoked " "network request probably went through -- %s", event) # validate no leak of information asserts.assert_false( cconsts.NETWORK_CB_KEY_NETWORK_SPECIFIER in event["data"], "Network specifier leak!") @test_tracker_info(uuid="2e325e2b-d552-4890-b470-20b40284395d") @WifiBaseTest.wifi_test_wrap def test_multiple_identical_networks(self): """Validate that creating multiple networks between 2 devices, each network with identical configuration is supported over a single NDP. Verify that the interface and IPv6 address is the same for all networks. """ init_dut = self.android_devices[0] init_dut.pretty_name = "Initiator" resp_dut = self.android_devices[1] resp_dut.pretty_name = "Responder" N = 2 # first iteration (must be 2 to give us a chance to cancel the first) M = 5 # second iteration init_ids = [] resp_ids = [] # Initiator+Responder: attach and wait for confirmation & identity # create N+M sessions to be used in the different (but identical) NDPs for i in range(N + M): id, init_mac = autils.attach_with_identity(init_dut) init_ids.append(id) id, resp_mac = autils.attach_with_identity(resp_dut) resp_ids.append(id) # wait for for devices to synchronize with each other - there are no other # mechanisms to make sure this happens for OOB discovery (except retrying # to execute the data-path request) time.sleep(autils.WAIT_FOR_CLUSTER) resp_req_keys = [] init_req_keys = [] resp_aware_ifs = [] init_aware_ifs = [] resp_aware_ipv6 = [] init_aware_ipv6 = [] # issue N quick requests for identical NDPs - without waiting for result # tests whether pre-setup multiple NDP procedure for i in range(N): # Responder: request network resp_req_keys.append( autils.request_network( resp_dut, resp_dut.droid.wifiAwareCreateNetworkSpecifierOob( resp_ids[i], aconsts.DATA_PATH_RESPONDER, init_mac, None))) # Initiator: request network init_req_keys.append( autils.request_network( init_dut, init_dut.droid.wifiAwareCreateNetworkSpecifierOob( init_ids[i], aconsts.DATA_PATH_INITIATOR, resp_mac, None))) # remove the first request (hopefully before completed) testing that NDP # is still created resp_dut.droid.connectivityUnregisterNetworkCallback(resp_req_keys[0]) resp_req_keys.remove(resp_req_keys[0]) init_dut.droid.connectivityUnregisterNetworkCallback(init_req_keys[0]) init_req_keys.remove(init_req_keys[0]) # wait for network formation for all initial requests # note: for IPv6 Init <--> Resp since each reports the other's IPv6 address # in it's transport-specific network info self.wait_for_request_responses(resp_dut, resp_req_keys, resp_aware_ifs, init_aware_ipv6) self.wait_for_request_responses(init_dut, init_req_keys, init_aware_ifs, resp_aware_ipv6) # issue M more requests for the same NDPs - tests post-setup multiple NDP for i in range(M): # Responder: request network resp_req_keys.append( autils.request_network( resp_dut, resp_dut.droid.wifiAwareCreateNetworkSpecifierOob( resp_ids[N + i], aconsts.DATA_PATH_RESPONDER, init_mac, None))) # Initiator: request network init_req_keys.append( autils.request_network( init_dut, init_dut.droid.wifiAwareCreateNetworkSpecifierOob( init_ids[N + i], aconsts.DATA_PATH_INITIATOR, resp_mac, None))) # wait for network formation for all subsequent requests self.wait_for_request_responses(resp_dut, resp_req_keys[N - 1:], resp_aware_ifs, init_aware_ipv6) self.wait_for_request_responses(init_dut, init_req_keys[N - 1:], init_aware_ifs, resp_aware_ipv6) # determine whether all interfaces and ipv6 addresses are identical # (single NDP) init_aware_ifs = list(set(init_aware_ifs)) resp_aware_ifs = list(set(resp_aware_ifs)) init_aware_ipv6 = list(set(init_aware_ipv6)) resp_aware_ipv6 = list(set(resp_aware_ipv6)) self.log.info("Interface names: I=%s, R=%s", init_aware_ifs, resp_aware_ifs) self.log.info("Interface IPv6: I=%s, R=%s", init_aware_ipv6, resp_aware_ipv6) self.log.info("Initiator requests: %s", init_req_keys) self.log.info("Responder requests: %s", resp_req_keys) asserts.assert_equal( len(init_aware_ifs), 1, "Multiple initiator interfaces") asserts.assert_equal( len(resp_aware_ifs), 1, "Multiple responder interfaces") asserts.assert_equal( len(init_aware_ipv6), 1, "Multiple initiator IPv6 addresses") asserts.assert_equal( len(resp_aware_ipv6), 1, "Multiple responder IPv6 addresses") for i in range( init_dut.aware_capabilities[aconsts.CAP_MAX_NDI_INTERFACES]): # note: using get_ipv6_addr (ifconfig method) since want to verify # that interfaces which do not have any NDPs on them do not have # an IPv6 link-local address. if_name = "%s%d" % (aconsts.AWARE_NDI_PREFIX, i) init_ipv6 = autils.get_ipv6_addr(init_dut, if_name) resp_ipv6 = autils.get_ipv6_addr(resp_dut, if_name) asserts.assert_equal( init_ipv6 is None, if_name not in init_aware_ifs, "Initiator interface %s in unexpected state" % if_name) asserts.assert_equal( resp_ipv6 is None, if_name not in resp_aware_ifs, "Responder interface %s in unexpected state" % if_name) # release requests for resp_req_key in resp_req_keys: resp_dut.droid.connectivityUnregisterNetworkCallback(resp_req_key) for init_req_key in init_req_keys: init_dut.droid.connectivityUnregisterNetworkCallback(init_req_key) @test_tracker_info(uuid="34cf12e8-5df6-49bd-b384-e9935d89a5b7") @WifiBaseTest.wifi_test_wrap def test_identical_network_from_both_sides(self): """Validate that requesting two identical NDPs (Open) each being initiated from a different side, results in the same/single NDP. Verify that the interface and IPv6 address is the same for all networks. """ dut1 = self.android_devices[0] dut2 = self.android_devices[1] id1, mac1 = autils.attach_with_identity(dut1) id2, mac2 = autils.attach_with_identity(dut2) # wait for for devices to synchronize with each other - there are no other # mechanisms to make sure this happens for OOB discovery (except retrying # to execute the data-path request) time.sleep(autils.WAIT_FOR_CLUSTER) # first NDP: DUT1 (Init) -> DUT2 (Resp) req_a_resp = autils.request_network( dut2, dut2.droid.wifiAwareCreateNetworkSpecifierOob( id2, aconsts.DATA_PATH_RESPONDER, mac1)) req_a_init = autils.request_network( dut1, dut1.droid.wifiAwareCreateNetworkSpecifierOob( id1, aconsts.DATA_PATH_INITIATOR, mac2)) req_a_resp_event_nc = autils.wait_for_event_with_keys( dut2, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_CAPABILITIES_CHANGED), (cconsts.NETWORK_CB_KEY_ID, req_a_resp)) req_a_init_event_nc = autils.wait_for_event_with_keys( dut1, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_CAPABILITIES_CHANGED), (cconsts.NETWORK_CB_KEY_ID, req_a_init)) # validate no leak of information asserts.assert_false( cconsts.NETWORK_CB_KEY_NETWORK_SPECIFIER in req_a_resp_event_nc[ "data"], "Network specifier leak!") asserts.assert_false( cconsts.NETWORK_CB_KEY_NETWORK_SPECIFIER in req_a_init_event_nc[ "data"], "Network specifier leak!") # note that Init <-> Resp since IPv6 are of peer's! req_a_ipv6_init = req_a_resp_event_nc["data"][aconsts.NET_CAP_IPV6] req_a_ipv6_resp = req_a_init_event_nc["data"][aconsts.NET_CAP_IPV6] req_a_resp_event_lp = autils.wait_for_event_with_keys( dut2, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED), (cconsts.NETWORK_CB_KEY_ID, req_a_resp)) req_a_init_event_lp = autils.wait_for_event_with_keys( dut1, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED), (cconsts.NETWORK_CB_KEY_ID, req_a_init)) req_a_if_resp = req_a_resp_event_lp["data"][ cconsts.NETWORK_CB_KEY_INTERFACE_NAME] req_a_if_init = req_a_init_event_lp["data"][ cconsts.NETWORK_CB_KEY_INTERFACE_NAME] self.log.info("Interface names for A: I=%s, R=%s", req_a_if_init, req_a_if_resp) self.log.info("Interface addresses (IPv6) for A: I=%s, R=%s", req_a_ipv6_init, req_a_ipv6_resp) # second NDP: DUT2 (Init) -> DUT1 (Resp) req_b_resp = autils.request_network( dut1, dut1.droid.wifiAwareCreateNetworkSpecifierOob( id1, aconsts.DATA_PATH_RESPONDER, mac2)) req_b_init = autils.request_network( dut2, dut2.droid.wifiAwareCreateNetworkSpecifierOob( id2, aconsts.DATA_PATH_INITIATOR, mac1)) req_b_resp_event_nc = autils.wait_for_event_with_keys( dut1, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_CAPABILITIES_CHANGED), (cconsts.NETWORK_CB_KEY_ID, req_b_resp)) req_b_init_event_nc = autils.wait_for_event_with_keys( dut2, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_CAPABILITIES_CHANGED), (cconsts.NETWORK_CB_KEY_ID, req_b_init)) # validate no leak of information asserts.assert_false( cconsts.NETWORK_CB_KEY_NETWORK_SPECIFIER in req_b_resp_event_nc[ "data"], "Network specifier leak!") asserts.assert_false( cconsts.NETWORK_CB_KEY_NETWORK_SPECIFIER in req_b_init_event_nc[ "data"], "Network specifier leak!") # note that Init <-> Resp since IPv6 are of peer's! req_b_ipv6_init = req_b_resp_event_nc["data"][aconsts.NET_CAP_IPV6] req_b_ipv6_resp = req_b_init_event_nc["data"][aconsts.NET_CAP_IPV6] req_b_resp_event_lp = autils.wait_for_event_with_keys( dut1, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED), (cconsts.NETWORK_CB_KEY_ID, req_b_resp)) req_b_init_event_lp = autils.wait_for_event_with_keys( dut2, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED), (cconsts.NETWORK_CB_KEY_ID, req_b_init)) req_b_if_resp = req_b_resp_event_lp["data"][ cconsts.NETWORK_CB_KEY_INTERFACE_NAME] req_b_if_init = req_b_init_event_lp["data"][ cconsts.NETWORK_CB_KEY_INTERFACE_NAME] self.log.info("Interface names for B: I=%s, R=%s", req_b_if_init, req_b_if_resp) self.log.info("Interface addresses (IPv6) for B: I=%s, R=%s", req_b_ipv6_init, req_b_ipv6_resp) # validate equality of NDPs (using interface names & ipv6) asserts.assert_equal(req_a_if_init, req_b_if_resp, "DUT1 NDPs are on different interfaces") asserts.assert_equal(req_a_if_resp, req_b_if_init, "DUT2 NDPs are on different interfaces") asserts.assert_equal(req_a_ipv6_init, req_b_ipv6_resp, "DUT1 NDPs are using different IPv6 addresses") asserts.assert_equal(req_a_ipv6_resp, req_b_ipv6_init, "DUT2 NDPs are using different IPv6 addresses") # release requests dut1.droid.connectivityUnregisterNetworkCallback(req_a_init) dut1.droid.connectivityUnregisterNetworkCallback(req_b_resp) dut2.droid.connectivityUnregisterNetworkCallback(req_a_resp) dut2.droid.connectivityUnregisterNetworkCallback(req_b_init) ######################################################################## def run_multiple_ndi(self, sec_configs, flip_init_resp=False): """Validate that the device can create and use multiple NDIs. The security configuration can be: - None: open - String: passphrase - otherwise: PMK (byte array) Args: sec_configs: list of security configurations flip_init_resp: if True the roles of Initiator and Responder are flipped between the 2 devices, otherwise same devices are always configured in the same role. """ dut1 = self.android_devices[0] dut2 = self.android_devices[1] asserts.skip_if( dut1.aware_capabilities[aconsts.CAP_MAX_NDI_INTERFACES] < len(sec_configs) or dut2.aware_capabilities[aconsts.CAP_MAX_NDI_INTERFACES] < len(sec_configs), "DUTs do not support enough NDIs") id1, mac1 = autils.attach_with_identity(dut1) id2, mac2 = autils.attach_with_identity(dut2) # wait for for devices to synchronize with each other - there are no other # mechanisms to make sure this happens for OOB discovery (except retrying # to execute the data-path request) time.sleep(autils.WAIT_FOR_CLUSTER) dut2_req_keys = [] dut1_req_keys = [] dut2_aware_ifs = [] dut1_aware_ifs = [] dut2_aware_ipv6s = [] dut1_aware_ipv6s = [] dut2_type = aconsts.DATA_PATH_RESPONDER dut1_type = aconsts.DATA_PATH_INITIATOR dut2_is_responder = True for sec in sec_configs: if dut2_is_responder: # DUT2 (Responder): request network dut2_req_key = autils.request_network( dut2, autils.get_network_specifier(dut2, id2, dut2_type, mac1, sec)) dut2_req_keys.append(dut2_req_key) # DUT1 (Initiator): request network dut1_req_key = autils.request_network( dut1, autils.get_network_specifier(dut1, id1, dut1_type, mac2, sec)) dut1_req_keys.append(dut1_req_key) else: # DUT1 (Responder): request network dut1_req_key = autils.request_network( dut1, autils.get_network_specifier(dut1, id1, dut1_type, mac2, sec)) dut1_req_keys.append(dut1_req_key) # DUT2 (Initiator): request network dut2_req_key = autils.request_network( dut2, autils.get_network_specifier(dut2, id2, dut2_type, mac1, sec)) dut2_req_keys.append(dut2_req_key) # Wait for network dut1_net_event_nc = autils.wait_for_event_with_keys( dut1, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_CAPABILITIES_CHANGED), (cconsts.NETWORK_CB_KEY_ID, dut1_req_key)) dut2_net_event_nc = autils.wait_for_event_with_keys( dut2, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_CAPABILITIES_CHANGED), (cconsts.NETWORK_CB_KEY_ID, dut2_req_key)) # validate no leak of information asserts.assert_false( cconsts.NETWORK_CB_KEY_NETWORK_SPECIFIER in dut1_net_event_nc[ "data"], "Network specifier leak!") asserts.assert_false( cconsts.NETWORK_CB_KEY_NETWORK_SPECIFIER in dut2_net_event_nc[ "data"], "Network specifier leak!") # Note: dut1 <--> dut2 IPv6's addresses since it is peer's info dut2_aware_ipv6 = dut1_net_event_nc["data"][aconsts.NET_CAP_IPV6] dut1_aware_ipv6 = dut2_net_event_nc["data"][aconsts.NET_CAP_IPV6] dut1_net_event_lp = autils.wait_for_event_with_keys( dut1, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED), (cconsts.NETWORK_CB_KEY_ID, dut1_req_key)) dut2_net_event_lp = autils.wait_for_event_with_keys( dut2, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED), (cconsts.NETWORK_CB_KEY_ID, dut2_req_key)) dut2_aware_if = dut2_net_event_lp["data"][ cconsts.NETWORK_CB_KEY_INTERFACE_NAME] dut1_aware_if = dut1_net_event_lp["data"][ cconsts.NETWORK_CB_KEY_INTERFACE_NAME] dut2_aware_ifs.append(dut2_aware_if) dut1_aware_ifs.append(dut1_aware_if) dut2_aware_ipv6s.append(dut2_aware_ipv6) dut1_aware_ipv6s.append(dut1_aware_ipv6) if flip_init_resp: if dut2_is_responder: dut2_type = aconsts.DATA_PATH_INITIATOR dut1_type = aconsts.DATA_PATH_RESPONDER else: dut2_type = aconsts.DATA_PATH_RESPONDER dut1_type = aconsts.DATA_PATH_INITIATOR dut2_is_responder = not dut2_is_responder # check that we are using 2 NDIs & that they have unique IPv6 addresses dut1_aware_ifs = list(set(dut1_aware_ifs)) dut2_aware_ifs = list(set(dut2_aware_ifs)) dut1_aware_ipv6s = list(set(dut1_aware_ipv6s)) dut2_aware_ipv6s = list(set(dut2_aware_ipv6s)) self.log.info("Interface names: DUT1=%s, DUT2=%s", dut1_aware_ifs, dut2_aware_ifs) self.log.info("IPv6 addresses: DUT1=%s, DUT2=%s", dut1_aware_ipv6s, dut2_aware_ipv6s) self.log.info("DUT1 requests: %s", dut1_req_keys) self.log.info("DUT2 requests: %s", dut2_req_keys) asserts.assert_equal( len(dut1_aware_ifs), len(sec_configs), "Multiple DUT1 interfaces") asserts.assert_equal( len(dut2_aware_ifs), len(sec_configs), "Multiple DUT2 interfaces") asserts.assert_equal( len(dut1_aware_ipv6s), len(sec_configs), "Multiple DUT1 IPv6 addresses") asserts.assert_equal( len(dut2_aware_ipv6s), len(sec_configs), "Multiple DUT2 IPv6 addresses") for i in range(len(sec_configs)): # note: using get_ipv6_addr (ifconfig method) since want to verify # that the system information is the same as the reported information if_name = "%s%d" % (aconsts.AWARE_NDI_PREFIX, i) dut1_ipv6 = autils.get_ipv6_addr(dut1, if_name) dut2_ipv6 = autils.get_ipv6_addr(dut2, if_name) asserts.assert_equal( dut1_ipv6 is None, if_name not in dut1_aware_ifs, "DUT1 interface %s in unexpected state" % if_name) asserts.assert_equal( dut2_ipv6 is None, if_name not in dut2_aware_ifs, "DUT2 interface %s in unexpected state" % if_name) # release requests for dut2_req_key in dut2_req_keys: dut2.droid.connectivityUnregisterNetworkCallback(dut2_req_key) for dut1_req_key in dut1_req_keys: dut1.droid.connectivityUnregisterNetworkCallback(dut1_req_key) @test_tracker_info(uuid="2d728163-11cc-46ba-a973-c8e1e71397fc") @WifiBaseTest.wifi_test_wrap def test_multiple_ndi_open_passphrase(self): """Verify that between 2 DUTs can create 2 NDPs with different security configuration (one open, one using passphrase). The result should use two different NDIs""" self.run_multiple_ndi([None, self.PASSPHRASE]) @test_tracker_info(uuid="5f2c32aa-20b2-41f0-8b1e-d0b68df73ada") @WifiBaseTest.wifi_test_wrap def test_multiple_ndi_open_pmk(self): """Verify that between 2 DUTs can create 2 NDPs with different security configuration (one open, one using pmk). The result should use two different NDIs""" self.run_multiple_ndi([None, self.PMK]) @test_tracker_info(uuid="34467659-bcfb-40cd-ba25-7e50560fca63") @WifiBaseTest.wifi_test_wrap def test_multiple_ndi_passphrase_pmk(self): """Verify that between 2 DUTs can create 2 NDPs with different security configuration (one using passphrase, one using pmk). The result should use two different NDIs""" self.run_multiple_ndi([self.PASSPHRASE, self.PMK]) @test_tracker_info(uuid="d9194ce6-45b6-41b1-9cc8-ada79968966d") @WifiBaseTest.wifi_test_wrap def test_multiple_ndi_passphrases(self): """Verify that between 2 DUTs can create 2 NDPs with different security configuration (using different passphrases). The result should use two different NDIs""" self.run_multiple_ndi([self.PASSPHRASE, self.PASSPHRASE2]) @test_tracker_info(uuid="879df795-62d2-40d4-a862-bd46d8f7e67f") @WifiBaseTest.wifi_test_wrap def test_multiple_ndi_pmks(self): """Verify that between 2 DUTs can create 2 NDPs with different security configuration (using different PMKS). The result should use two different NDIs""" self.run_multiple_ndi([self.PMK, self.PMK2]) @test_tracker_info(uuid="397d380a-8e41-466e-9ccb-cf8f413d83ba") @WifiBaseTest.wifi_test_wrap def test_multiple_ndi_open_passphrase_flip(self): """Verify that between 2 DUTs can create 2 NDPs with different security configuration (one open, one using passphrase). The result should use two different NDIs. Flip Initiator and Responder roles. """ self.run_multiple_ndi([None, self.PASSPHRASE], flip_init_resp=True) @test_tracker_info(uuid="b3a4300b-1514-4cb8-a814-9c2baa449700") @WifiBaseTest.wifi_test_wrap def test_multiple_ndi_open_pmk_flip(self): """Verify that between 2 DUTs can create 2 NDPs with different security configuration (one open, one using pmk). The result should use two different NDIs Flip Initiator and Responder roles. """ self.run_multiple_ndi([None, self.PMK], flip_init_resp=True) @test_tracker_info(uuid="0bfea9e4-e57d-417f-8db4-245741e9bbd5") @WifiBaseTest.wifi_test_wrap def test_multiple_ndi_passphrase_pmk_flip(self): """Verify that between 2 DUTs can create 2 NDPs with different security configuration (one using passphrase, one using pmk). The result should use two different NDIs Flip Initiator and Responder roles. """ self.run_multiple_ndi([self.PASSPHRASE, self.PMK], flip_init_resp=True) @test_tracker_info(uuid="74023483-5417-431b-a362-991ad4a03ab8") @WifiBaseTest.wifi_test_wrap def test_multiple_ndi_passphrases_flip(self): """Verify that between 2 DUTs can create 2 NDPs with different security configuration (using different passphrases). The result should use two different NDIs Flip Initiator and Responder roles. """ self.run_multiple_ndi( [self.PASSPHRASE, self.PASSPHRASE2], flip_init_resp=True) @test_tracker_info(uuid="873b2d91-28a1-403f-ae9c-d756bb2f59ee") @WifiBaseTest.wifi_test_wrap def test_multiple_ndi_pmks_flip(self): """Verify that between 2 DUTs can create 2 NDPs with different security configuration (using different PMKS). The result should use two different NDIs Flip Initiator and Responder roles. """ self.run_multiple_ndi([self.PMK, self.PMK2], flip_init_resp=True) ####################################### def run_multiple_regulatory_domains(self, use_ib, init_domain, resp_domain): """Verify that a data-path setup with two conflicting regulatory domains works (the result should be run in Channel 6 - but that is not tested). Args: use_ib: True to use in-band discovery, False to use out-of-band discovery. init_domain: The regulatory domain of the Initiator/Subscriber. resp_domain: The regulator domain of the Responder/Publisher. """ init_dut = self.android_devices[0] resp_dut = self.android_devices[1] wutils.set_wifi_country_code(init_dut, init_domain) wutils.set_wifi_country_code(resp_dut, resp_domain) if use_ib: (resp_req_key, init_req_key, resp_aware_if, init_aware_if, resp_ipv6, init_ipv6) = autils.create_ib_ndp( resp_dut, init_dut, autils.create_discovery_config( "GoogleTestXyz", aconsts.PUBLISH_TYPE_UNSOLICITED), autils.create_discovery_config( "GoogleTestXyz", aconsts.SUBSCRIBE_TYPE_PASSIVE), self.device_startup_offset) else: (init_req_key, resp_req_key, init_aware_if, resp_aware_if, init_ipv6, resp_ipv6) = autils.create_oob_ndp(init_dut, resp_dut) self.log.info("Interface names: I=%s, R=%s", init_aware_if, resp_aware_if) self.log.info("Interface addresses (IPv6): I=%s, R=%s", init_ipv6, resp_ipv6) # clean-up resp_dut.droid.connectivityUnregisterNetworkCallback(resp_req_key) init_dut.droid.connectivityUnregisterNetworkCallback(init_req_key) @test_tracker_info(uuid="eff53739-35c5-47a6-81f0-d70b51d89c3b") @WifiBaseTest.wifi_test_wrap def test_multiple_regulator_domains_ib_us_jp(self): """Verify data-path setup across multiple regulator domains. - Uses in-band discovery - Subscriber=US, Publisher=JP """ self.run_multiple_regulatory_domains( use_ib=True, init_domain=wutils.WifiEnums.CountryCode.US, resp_domain=wutils.WifiEnums.CountryCode.JAPAN) @test_tracker_info(uuid="19af47cc-3204-40ef-b50f-14cf7b89cf4a") @WifiBaseTest.wifi_test_wrap def test_multiple_regulator_domains_ib_jp_us(self): """Verify data-path setup across multiple regulator domains. - Uses in-band discovery - Subscriber=JP, Publisher=US """ self.run_multiple_regulatory_domains( use_ib=True, init_domain=wutils.WifiEnums.CountryCode.JAPAN, resp_domain=wutils.WifiEnums.CountryCode.US) @test_tracker_info(uuid="65285ab3-977f-4dbd-b663-d5a02f4fc663") @WifiBaseTest.wifi_test_wrap def test_multiple_regulator_domains_oob_us_jp(self): """Verify data-path setup across multiple regulator domains. - Uses out-f-band discovery - Initiator=US, Responder=JP """ self.run_multiple_regulatory_domains( use_ib=False, init_domain=wutils.WifiEnums.CountryCode.US, resp_domain=wutils.WifiEnums.CountryCode.JAPAN) @test_tracker_info(uuid="8a417e24-aaf6-44b9-a089-a07c3ba8d954") @WifiBaseTest.wifi_test_wrap def test_multiple_regulator_domains_oob_jp_us(self): """Verify data-path setup across multiple regulator domains. - Uses out-of-band discovery - Initiator=JP, Responder=US """ self.run_multiple_regulatory_domains( use_ib=False, init_domain=wutils.WifiEnums.CountryCode.JAPAN, resp_domain=wutils.WifiEnums.CountryCode.US) ######################################################################## def run_mix_ib_oob(self, same_request, ib_first, inits_on_same_dut): """Validate that multiple network requests issued using both in-band and out-of-band discovery behave as expected. The same_request parameter controls whether identical single NDP is expected, if True, or whether multiple NDPs on different NDIs are expected, if False. Args: same_request: Issue canonically identical requests (same NMI peer, same passphrase) if True, if False use different passphrases. ib_first: If True then the in-band network is requested first, otherwise (if False) then the out-of-band network is requested first. inits_on_same_dut: If True then the Initiators are run on the same device, otherwise (if False) then the Initiators are run on different devices. Note that Subscribe == Initiator. """ if not same_request: asserts.skip_if( self.android_devices[0] .aware_capabilities[aconsts.CAP_MAX_NDI_INTERFACES] < 2 or self.android_devices[1] .aware_capabilities[aconsts.CAP_MAX_NDI_INTERFACES] < 2, "DUTs do not support enough NDIs") (p_dut, s_dut, p_id, s_id, p_disc_id, s_disc_id, peer_id_on_sub, peer_id_on_pub) = self.set_up_discovery( aconsts.PUBLISH_TYPE_UNSOLICITED, aconsts.SUBSCRIBE_TYPE_PASSIVE, True) p_id2, p_mac = autils.attach_with_identity(p_dut) s_id2, s_mac = autils.attach_with_identity(s_dut) if inits_on_same_dut: resp_dut = p_dut resp_id = p_id2 resp_mac = p_mac init_dut = s_dut init_id = s_id2 init_mac = s_mac else: resp_dut = s_dut resp_id = s_id2 resp_mac = s_mac init_dut = p_dut init_id = p_id2 init_mac = p_mac passphrase = None if same_request else self.PASSPHRASE if ib_first: # request in-band network (to completion) p_req_key = self.request_network( p_dut, p_dut.droid.wifiAwareCreateNetworkSpecifier(p_disc_id, peer_id_on_pub)) s_req_key = self.request_network( s_dut, s_dut.droid.wifiAwareCreateNetworkSpecifier(s_disc_id, peer_id_on_sub)) # Publisher & Subscriber: wait for network formation p_net_event_nc = autils.wait_for_event_with_keys( p_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_CAPABILITIES_CHANGED), (cconsts.NETWORK_CB_KEY_ID, p_req_key)) s_net_event_nc = autils.wait_for_event_with_keys( s_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_CAPABILITIES_CHANGED), (cconsts.NETWORK_CB_KEY_ID, s_req_key)) p_net_event_lp = autils.wait_for_event_with_keys( p_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED), (cconsts.NETWORK_CB_KEY_ID, p_req_key)) s_net_event_lp = autils.wait_for_event_with_keys( s_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED), (cconsts.NETWORK_CB_KEY_ID, s_req_key)) # validate no leak of information asserts.assert_false( cconsts.NETWORK_CB_KEY_NETWORK_SPECIFIER in p_net_event_nc[ "data"], "Network specifier leak!") asserts.assert_false( cconsts.NETWORK_CB_KEY_NETWORK_SPECIFIER in s_net_event_nc[ "data"], "Network specifier leak!") # request out-of-band network resp_req_key = autils.request_network( resp_dut, resp_dut.droid.wifiAwareCreateNetworkSpecifierOob( resp_id, aconsts.DATA_PATH_RESPONDER, init_mac, passphrase)) init_req_key = autils.request_network( init_dut, init_dut.droid.wifiAwareCreateNetworkSpecifierOob( init_id, aconsts.DATA_PATH_INITIATOR, resp_mac, passphrase)) resp_net_event_nc = autils.wait_for_event_with_keys( resp_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_CAPABILITIES_CHANGED), (cconsts.NETWORK_CB_KEY_ID, resp_req_key)) init_net_event_nc = autils.wait_for_event_with_keys( init_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_CAPABILITIES_CHANGED), (cconsts.NETWORK_CB_KEY_ID, init_req_key)) resp_net_event_lp = autils.wait_for_event_with_keys( resp_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED), (cconsts.NETWORK_CB_KEY_ID, resp_req_key)) init_net_event_lp = autils.wait_for_event_with_keys( init_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED), (cconsts.NETWORK_CB_KEY_ID, init_req_key)) # validate no leak of information asserts.assert_false( cconsts.NETWORK_CB_KEY_NETWORK_SPECIFIER in resp_net_event_nc[ "data"], "Network specifier leak!") asserts.assert_false( cconsts.NETWORK_CB_KEY_NETWORK_SPECIFIER in init_net_event_nc[ "data"], "Network specifier leak!") if not ib_first: # request in-band network (to completion) p_req_key = self.request_network( p_dut, p_dut.droid.wifiAwareCreateNetworkSpecifier(p_disc_id, peer_id_on_pub)) s_req_key = self.request_network( s_dut, s_dut.droid.wifiAwareCreateNetworkSpecifier(s_disc_id, peer_id_on_sub)) # Publisher & Subscriber: wait for network formation p_net_event_nc = autils.wait_for_event_with_keys( p_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_CAPABILITIES_CHANGED), (cconsts.NETWORK_CB_KEY_ID, p_req_key)) s_net_event_nc = autils.wait_for_event_with_keys( s_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_CAPABILITIES_CHANGED), (cconsts.NETWORK_CB_KEY_ID, s_req_key)) p_net_event_lp = autils.wait_for_event_with_keys( p_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED), (cconsts.NETWORK_CB_KEY_ID, p_req_key)) s_net_event_lp = autils.wait_for_event_with_keys( s_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED), (cconsts.NETWORK_CB_KEY_ID, s_req_key)) # validate no leak of information asserts.assert_false( cconsts.NETWORK_CB_KEY_NETWORK_SPECIFIER in p_net_event_nc[ "data"], "Network specifier leak!") asserts.assert_false( cconsts.NETWORK_CB_KEY_NETWORK_SPECIFIER in s_net_event_nc[ "data"], "Network specifier leak!") # note that Init <-> Resp & Pub <--> Sub since IPv6 are of peer's! init_ipv6 = resp_net_event_nc["data"][aconsts.NET_CAP_IPV6] resp_ipv6 = init_net_event_nc["data"][aconsts.NET_CAP_IPV6] pub_ipv6 = s_net_event_nc["data"][aconsts.NET_CAP_IPV6] sub_ipv6 = p_net_event_nc["data"][aconsts.NET_CAP_IPV6] # extract net info pub_interface = p_net_event_lp["data"][ cconsts.NETWORK_CB_KEY_INTERFACE_NAME] sub_interface = s_net_event_lp["data"][ cconsts.NETWORK_CB_KEY_INTERFACE_NAME] resp_interface = resp_net_event_lp["data"][ cconsts.NETWORK_CB_KEY_INTERFACE_NAME] init_interface = init_net_event_lp["data"][ cconsts.NETWORK_CB_KEY_INTERFACE_NAME] self.log.info("Interface names: Pub=%s, Sub=%s, Resp=%s, Init=%s", pub_interface, sub_interface, resp_interface, init_interface) self.log.info( "Interface addresses (IPv6): Pub=%s, Sub=%s, Resp=%s, Init=%s", pub_ipv6, sub_ipv6, resp_ipv6, init_ipv6) # validate NDP/NDI conditions (using interface names & ipv6) if same_request: asserts.assert_equal( pub_interface, resp_interface if inits_on_same_dut else init_interface, "NDP interfaces don't match on Pub/other") asserts.assert_equal( sub_interface, init_interface if inits_on_same_dut else resp_interface, "NDP interfaces don't match on Sub/other") asserts.assert_equal(pub_ipv6, resp_ipv6 if inits_on_same_dut else init_ipv6, "NDP IPv6 don't match on Pub/other") asserts.assert_equal(sub_ipv6, init_ipv6 if inits_on_same_dut else resp_ipv6, "NDP IPv6 don't match on Sub/other") else: asserts.assert_false( pub_interface == (resp_interface if inits_on_same_dut else init_interface), "NDP interfaces match on Pub/other") asserts.assert_false( sub_interface == (init_interface if inits_on_same_dut else resp_interface), "NDP interfaces match on Sub/other") asserts.assert_false( pub_ipv6 == (resp_ipv6 if inits_on_same_dut else init_ipv6), "NDP IPv6 match on Pub/other") asserts.assert_false( sub_ipv6 == (init_ipv6 if inits_on_same_dut else resp_ipv6), "NDP IPv6 match on Sub/other") # release requests p_dut.droid.connectivityUnregisterNetworkCallback(p_req_key) s_dut.droid.connectivityUnregisterNetworkCallback(s_req_key) resp_dut.droid.connectivityUnregisterNetworkCallback(resp_req_key) init_dut.droid.connectivityUnregisterNetworkCallback(init_req_key) @test_tracker_info(uuid="d8a0839d-4ba0-43f2-af93-3cf1382f9f16") @WifiBaseTest.wifi_test_wrap def test_identical_ndps_mix_ib_oob_ib_first_same_polarity(self): """Validate that a single NDP is created for multiple identical requests which are issued through either in-band (ib) or out-of-band (oob) APIs. The in-band request is issued first. Both Initiators (Sub == Initiator) are run on the same device. """ self.run_mix_ib_oob( same_request=True, ib_first=True, inits_on_same_dut=True) @test_tracker_info(uuid="70bbb811-0bed-4a19-96b3-f2446e777c8a") @WifiBaseTest.wifi_test_wrap def test_identical_ndps_mix_ib_oob_oob_first_same_polarity(self): """Validate that a single NDP is created for multiple identical requests which are issued through either in-band (ib) or out-of-band (oob) APIs. The out-of-band request is issued first. Both Initiators (Sub == Initiator) are run on the same device. """ self.run_mix_ib_oob( same_request=True, ib_first=False, inits_on_same_dut=True) @test_tracker_info(uuid="d9796da5-f96a-4a51-be0f-89d6f5bfe3ad") @WifiBaseTest.wifi_test_wrap def test_identical_ndps_mix_ib_oob_ib_first_diff_polarity(self): """Validate that a single NDP is created for multiple identical requests which are issued through either in-band (ib) or out-of-band (oob) APIs. The in-band request is issued first. Initiators (Sub == Initiator) are run on different devices. """ self.run_mix_ib_oob( same_request=True, ib_first=True, inits_on_same_dut=False) @test_tracker_info(uuid="48b9005b-7851-4222-b41c-1fcbefbc704d") @WifiBaseTest.wifi_test_wrap def test_identical_ndps_mix_ib_oob_oob_first_diff_polarity(self): """Validate that a single NDP is created for multiple identical requests which are issued through either in-band (ib) or out-of-band (oob) APIs. The out-of-band request is issued first. Initiators (Sub == Initiator) are run on different devices. """ self.run_mix_ib_oob( same_request=True, ib_first=False, inits_on_same_dut=False) @test_tracker_info(uuid="51f9581e-c5ee-48a7-84d2-adff4876c3d7") @WifiBaseTest.wifi_test_wrap def test_multiple_ndis_mix_ib_oob_ib_first_same_polarity(self): """Validate that multiple NDIs are created for NDPs which are requested with different security configurations. Use a mix of in-band and out-of-band APIs to request the different NDPs. The in-band request is issued first. Initiators (Sub == Initiator) are run on the same device. """ self.run_mix_ib_oob( same_request=False, ib_first=True, inits_on_same_dut=True) @test_tracker_info(uuid="b1e3070e-4d38-4b31-862d-39b82e0f2853") @WifiBaseTest.wifi_test_wrap def test_multiple_ndis_mix_ib_oob_oob_first_same_polarity(self): """Validate that multiple NDIs are created for NDPs which are requested with different security configurations. Use a mix of in-band and out-of-band APIs to request the different NDPs. The out-of-band request is issued first. Initiators (Sub == Initiator) are run on the same device. """ self.run_mix_ib_oob( same_request=False, ib_first=False, inits_on_same_dut=True) @test_tracker_info(uuid="b1e3070e-4d38-4b31-862d-39b82e0f2853") @WifiBaseTest.wifi_test_wrap def test_multiple_ndis_mix_ib_oob_ib_first_diff_polarity(self): """Validate that multiple NDIs are created for NDPs which are requested with different security configurations. Use a mix of in-band and out-of-band APIs to request the different NDPs. The in-band request is issued first. Initiators (Sub == Initiator) are run on different devices. """ self.run_mix_ib_oob( same_request=False, ib_first=True, inits_on_same_dut=False) @test_tracker_info(uuid="596caadf-028e-494b-bbce-8304ccec2cbb") @WifiBaseTest.wifi_test_wrap def test_multiple_ndis_mix_ib_oob_oob_first_diff_polarity(self): """Validate that multiple NDIs are created for NDPs which are requested with different security configurations. Use a mix of in-band and out-of-band APIs to request the different NDPs. The out-of-band request is issued first. Initiators (Sub == Initiator) are run on different devices. """ self.run_mix_ib_oob( same_request=False, ib_first=False, inits_on_same_dut=False) ######################################################################## @test_tracker_info(uuid="5ec10bf9-bfda-4093-8344-7ccc7764737e") def test_ndp_loop(self): """Validate that can create a loop (chain) of N NDPs between N devices, where N >= 3, e.g. A - B B - C C - A The NDPs are all OPEN (no encryption). """ asserts.skip_if( len(self.android_devices) < 3, 'A minimum of 3 devices is needed to run the test, have %d' % len( self.android_devices)) asserts.skip_if( self.android_devices[0] .aware_capabilities[aconsts.CAP_MAX_NDI_INTERFACES] < 2 or self.android_devices[1] .aware_capabilities[aconsts.CAP_MAX_NDI_INTERFACES] < 2 or self.android_devices[2] .aware_capabilities[aconsts.CAP_MAX_NDI_INTERFACES] < 2, "DUTs do not support enough NDIs") duts = self.android_devices loop_len = len(duts) ids = [] macs = [] reqs = [] ifs = [] ipv6s = [] for i in range(loop_len): duts[i].pretty_name = chr(ord("A") + i) reqs.append([]) ifs.append([]) ipv6s.append([]) # start-up 3 devices (attach w/ identity) for i in range(loop_len): ids.append(duts[i].droid.wifiAwareAttach(True)) autils.wait_for_event(duts[i], aconsts.EVENT_CB_ON_ATTACHED) ident_event = autils.wait_for_event( duts[i], aconsts.EVENT_CB_ON_IDENTITY_CHANGED) macs.append(ident_event['data']['mac']) # wait for for devices to synchronize with each other - there are no other # mechanisms to make sure this happens for OOB discovery (except retrying # to execute the data-path request) time.sleep(autils.WAIT_FOR_CLUSTER) # create the N NDPs: i to (i+1) % N for i in range(loop_len): peer_device = (i + 1) % loop_len (init_req_key, resp_req_key, init_aware_if, resp_aware_if, init_ipv6, resp_ipv6) = autils.create_oob_ndp_on_sessions( duts[i], duts[peer_device], ids[i], macs[i], ids[peer_device], macs[peer_device]) reqs[i].append(init_req_key) reqs[peer_device].append(resp_req_key) ifs[i].append(init_aware_if) ifs[peer_device].append(resp_aware_if) ipv6s[i].append(init_ipv6) ipv6s[peer_device].append(resp_ipv6) # clean-up for i in range(loop_len): for req in reqs[i]: duts[i].droid.connectivityUnregisterNetworkCallback(req) # info self.log.info("MACs: %s", macs) self.log.info("Interface names: %s", ifs) self.log.info("IPv6 addresses: %s", ipv6s) asserts.explicit_pass( "NDP loop test", extras={ "macs": macs, "ifs": ifs, "ipv6s": ipv6s }) @test_tracker_info(uuid="88cd288f-71ad-40fe-94d7-34e60fb6c962") def test_ndp_initiate_from_both_sides_with_accepts_any_responder(self): """Validate when two device both try to initiate a connection to each other, both NDP can be formed. 1. Device A and B both publish and file accept any request. 2. Device A and B both subscribe to the peer 3. When get a discovery match file a network request to the peer 4. Wait for NDP setup finish. """ asserts.skip_if( self.android_devices[0] .aware_capabilities[aconsts.CAP_MAX_NDI_INTERFACES] < 2 or self.android_devices[1] .aware_capabilities[aconsts.CAP_MAX_NDI_INTERFACES] < 2, "DUTs do not support enough NDIs.") ptype = aconsts.PUBLISH_TYPE_UNSOLICITED stype = aconsts.SUBSCRIBE_TYPE_PASSIVE dut1 = self.android_devices[0] dut2 = self.android_devices[1] # Publisher+Subscriber: attach and wait for confirmation dut1_id = dut1.droid.wifiAwareAttach() autils.wait_for_event(dut1, aconsts.EVENT_CB_ON_ATTACHED) time.sleep(self.device_startup_offset) dut2_id = dut2.droid.wifiAwareAttach() autils.wait_for_event(dut2, aconsts.EVENT_CB_ON_ATTACHED) # Publisher: start publish and wait for confirmation dut1_p_disc_id = dut1.droid.wifiAwarePublish(dut1_id, self.create_config(ptype)) autils.wait_for_event(dut1, aconsts.SESSION_CB_ON_PUBLISH_STARTED) dut2_p_disc_id = dut2.droid.wifiAwarePublish(dut2_id, self.create_config(ptype)) autils.wait_for_event(dut2, aconsts.SESSION_CB_ON_PUBLISH_STARTED) # Publisher: file an accept any NetworkRequest dut1_p_req_key = self.request_network( dut1, dut1.droid.wifiAwareCreateNetworkSpecifier( dut1_p_disc_id, None, self.PASSPHRASE, None)) dut2_p_req_key = self.request_network( dut2, dut2.droid.wifiAwareCreateNetworkSpecifier( dut2_p_disc_id, None, self.PASSPHRASE, None)) # Subscriber: start subscribe and wait for discovery match dut2_s_disc_id = dut2.droid.wifiAwareSubscribe(dut2_id, self.create_config(stype)) autils.wait_for_event(dut2, aconsts.SESSION_CB_ON_SUBSCRIBE_STARTED) dut1_s_disc_id = dut1.droid.wifiAwareSubscribe(dut1_id, self.create_config(stype)) autils.wait_for_event(dut1, aconsts.SESSION_CB_ON_SUBSCRIBE_STARTED) discovery_event = autils.wait_for_event( dut2, aconsts.SESSION_CB_ON_SERVICE_DISCOVERED) dut2_peer_id_on_sub = discovery_event["data"][ aconsts.SESSION_CB_KEY_PEER_ID] discovery_event = autils.wait_for_event( dut1, aconsts.SESSION_CB_ON_SERVICE_DISCOVERED) dut1_peer_id_on_sub = discovery_event["data"][ aconsts.SESSION_CB_KEY_PEER_ID] # Subscriber: file a NetworkRequest with peer Id dut2_s_req_key = self.request_network( dut2, dut2.droid.wifiAwareCreateNetworkSpecifier( dut2_s_disc_id, dut2_peer_id_on_sub, self.PASSPHRASE, None)) # Avoid publisher and subscriber requests are handled in the same NDP time.sleep(1) dut1_s_req_key = self.request_network( dut1, dut1.droid.wifiAwareCreateNetworkSpecifier( dut1_s_disc_id, dut1_peer_id_on_sub, self.PASSPHRASE, None)) # Publisher+Subscriber: wait and check the network callback. autils.wait_for_event_with_keys( dut1, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_CAPABILITIES_CHANGED), (cconsts.NETWORK_CB_KEY_ID, dut1_p_req_key)) autils.wait_for_event_with_keys( dut2, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_CAPABILITIES_CHANGED), (cconsts.NETWORK_CB_KEY_ID, dut2_s_req_key)) autils.wait_for_event_with_keys( dut2, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_CAPABILITIES_CHANGED), (cconsts.NETWORK_CB_KEY_ID, dut2_p_req_key)) autils.wait_for_event_with_keys( dut1, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_CAPABILITIES_CHANGED), (cconsts.NETWORK_CB_KEY_ID, dut1_s_req_key))