1# Copyright (C) 2024 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Wi-Fi Aware discovery ranging test module.""" 15 16import logging 17import sys 18import time 19 20from android.platform.test.annotations import ApiTest 21from mobly import asserts 22from mobly import base_test 23from mobly import records 24from mobly import test_runner 25from mobly import utils 26from mobly.controllers import android_device 27import wifi_test_utils 28 29from aware import aware_snippet_utils 30from aware import constants 31 32 33_SNIPPET_PACKAGE_NAME = constants.WIFI_SNIPPET_PACKAGE_NAME 34_DEFAULT_TIMEOUT = constants.WAIT_WIFI_STATE_TIME_OUT.total_seconds() 35_MSG_SUB_TO_PUB = "Let's talk [Random Identifier: {random_id}]" 36_MSG_PUB_TO_SUB = 'Ready [Random Identifier: %s]' 37_LARGE_ENOUGH_DISTANCE_MM = 100000 # 100 meters 38_MIN_RSSI = -100 39_WAIT_SEC_FOR_RTT_INITIATOR_RESPONDER_SWITCH = 5 40 41 42@ApiTest( 43 apis=[ 44 'android.net.wifi.rtt.RangingRequest.Builder#addWifiAwarePeer(android.net.wifi.aware.PeerHandle)', 45 'android.net.wifi.aware.PublishConfig.Builder#setRangingEnabled(boolean)', 46 'android.net.wifi.aware.SubscribeConfig.Builder#setMaxDistanceMm(int)', 47 'android.net.wifi.rtt.WifiRttManager#startRanging(android.net.wifi.rtt.RangingRequest, java.util.concurrent.Executor, android.net.wifi.rtt.RangingResultCallback)', 48 'android.net.wifi.aware.WifiAwareManager#attach(android.net.wifi.aware.AttachCallback, android.net.wifi.aware.IdentityChangedListener, android.os.Handler)', 49 'android.net.wifi.aware.WifiAwareSession#publish(android.net.wifi.aware.PublishConfig, android.net.wifi.aware.DiscoverySessionCallback, android.os.Handler)', 50 'android.net.wifi.aware.WifiAwareSession#subscrible(android.net.wifi.aware.SubscribeConfig, android.net.wifi.aware.DiscoverySessionCallback, android.os.Handler)', 51 ] 52) 53class WifiAwareDiscoveryRangingTest(base_test.BaseTestClass): 54 """Wi-Fi Aware discovery ranging test class. 55 56 All tests in this class share the same test steps and expected results. 57 The difference is that different tests perform ranging with different 58 configurations. 59 60 Test Preconditions: 61 * Two Android devices that support Wi-Fi Aware and Wi-Fi RTT. 62 * The devices should be placed ~20cm apart. 63 64 Test Steps: 65 1. Attach a Wi-Fi Aware session on each device. 66 2. Publisher publishes an Wi-Fi Aware service, subscriber subscribes 67 to it. Wait for service discovery. 68 3. Send messages through discovery session's API. 69 4. Perform ranging on one device to another device through Wi-Fi Aware. 70 Perform this step on the publisher and subscriber, respectively. 71 72 Expected Results: 73 Discovery ranging succeeds. In ranging result, the device that 74 performs ranging discovers the expected peer device, with a valid 75 distance and RSSI value. 76 """ 77 78 ads: list[android_device.AndroidDevice] 79 publisher: android_device.AndroidDevice 80 subscriber: android_device.AndroidDevice 81 82 # Wi-Fi Aware attach session ID 83 pub_attach_session: str | None = None 84 sub_attach_session: str | None = None 85 # Wi-Fi Aware discovery session ID 86 pub_session: str | None = None 87 sub_session: str | None = None 88 89 def setup_class(self): 90 # Register and set up Android devices in parallel. 91 self.ads = self.register_controller(android_device, min_number=2) 92 self.publisher = self.ads[0] 93 self.subscriber = self.ads[1] 94 95 # Device setup 96 utils.concurrent_exec( 97 self._setup_device, 98 ((self.publisher,), (self.subscriber,)), 99 max_workers=2, 100 raise_on_exception=True, 101 ) 102 103 # Device capability check 104 for device in [self.publisher, self.subscriber]: 105 asserts.abort_class_if( 106 not device.wifi.wifiAwareIsSupported(), 107 f'{device} does not support Wi-Fi Aware.', 108 ) 109 asserts.abort_class_if( 110 not device.wifi.wifiAwareIsAvailable(), 111 f'Wi-Fi Aware is not available on {device}.', 112 ) 113 asserts.abort_class_if( 114 not device.wifi.wifiAwareIsRttSupported(), 115 f'{device} does not support Wi-Fi RTT.', 116 ) 117 118 def _setup_device(self, device: android_device.AndroidDevice): 119 device.load_snippet('wifi', _SNIPPET_PACKAGE_NAME) 120 device.wifi.wifiEnable() 121 wifi_test_utils.set_screen_on_and_unlock(device) 122 wifi_test_utils.enable_wifi_verbose_logging(device) 123 124 def test_discovery_ranging_to_peer_handle(self) -> None: 125 """Test ranging to a Wi-Fi Aware peer handle. 126 127 See class docstring for the test steps and expected results. 128 """ 129 pub_config = constants.PublishConfig( 130 publish_type=constants.PublishType.UNSOLICITED, 131 ranging_enabled=True, 132 ) 133 sub_config = constants.SubscribeConfig( 134 subscribe_type=constants.SubscribeType.PASSIVE, 135 max_distance_mm=_LARGE_ENOUGH_DISTANCE_MM, 136 ) 137 138 # Step 1 - 3: Set up Wi-Fi Aware discovery sessions, so it is ready 139 # for ranging. 140 _, _, pub_peer, sub_peer = self._setup_discovery_sessions( 141 pub_config=pub_config, 142 sub_config=sub_config, 143 ) 144 145 # Step 4: Perform ranging on the publisher and subscriber, respectively. 146 self.publisher.log.info('Performing ranging to peer ID %d.', pub_peer) 147 self._perform_ranging( 148 self.publisher, 149 constants.RangingRequest(peer_ids=[pub_peer]), 150 ) 151 152 # RTT initiator/responder role switch takes time. We don't have an 153 # API to enforce it. So wait a few seconds for a semi-arbitrary 154 # teardown. 155 time.sleep(_WAIT_SEC_FOR_RTT_INITIATOR_RESPONDER_SWITCH) 156 self.subscriber.log.info('Performing ranging to peer ID %d.', sub_peer) 157 self._perform_ranging( 158 self.subscriber, 159 constants.RangingRequest(peer_ids=[sub_peer]), 160 ) 161 162 # Test finished, clean up. 163 self.publisher.wifi.wifiAwareCloseDiscoverSession(self.pub_session) 164 self.subscriber.wifi.wifiAwareCloseDiscoverSession(self.sub_session) 165 self.publisher.wifi.wifiAwareDetach(self.pub_attach_session) 166 self.subscriber.wifi.wifiAwareDetach(self.sub_attach_session) 167 168 def test_discovery_ranging_to_peer_mac_address(self) -> None: 169 """Test ranging to a Wi-Fi Aware peer MAC address. 170 171 See class docstring for the test steps and expected results. 172 """ 173 pub_config = constants.PublishConfig( 174 publish_type=constants.PublishType.UNSOLICITED, 175 ranging_enabled=True, 176 ) 177 sub_config = constants.SubscribeConfig( 178 subscribe_type=constants.SubscribeType.PASSIVE, 179 max_distance_mm=_LARGE_ENOUGH_DISTANCE_MM, 180 ) 181 182 # Step 1 - 3: Set up Wi-Fi Aware discovery sessions, so it is ready 183 # for ranging. 184 pub_mac_address, sub_mac_address, _, _ = self._setup_discovery_sessions( 185 pub_config=pub_config, 186 sub_config=sub_config, 187 ) 188 189 # Step 4: Perform ranging on the publisher and subscriber, respectively. 190 self.publisher.log.info( 191 'Performing ranging to peer MAC address %s.', sub_mac_address 192 ) 193 self._perform_ranging( 194 self.publisher, 195 constants.RangingRequest(peer_mac_addresses=[sub_mac_address]), 196 ) 197 198 # RTT initiator/responder role switch takes time. We don't have an 199 # API to enforce it. So wait a few seconds for a semi-arbitrary 200 # teardown. 201 time.sleep(_WAIT_SEC_FOR_RTT_INITIATOR_RESPONDER_SWITCH) 202 self.subscriber.log.info( 203 'Performing ranging to peer MAC address %s.', pub_mac_address 204 ) 205 self._perform_ranging( 206 self.subscriber, 207 constants.RangingRequest(peer_mac_addresses=[pub_mac_address]), 208 ) 209 210 # Test finished, clean up. 211 self.publisher.wifi.wifiAwareCloseDiscoverSession(self.pub_session) 212 self.subscriber.wifi.wifiAwareCloseDiscoverSession(self.sub_session) 213 self.publisher.wifi.wifiAwareDetach(self.pub_attach_session) 214 self.subscriber.wifi.wifiAwareDetach(self.sub_attach_session) 215 216 def _setup_discovery_sessions( 217 self, 218 pub_config: constants.PublishConfig, 219 sub_config: constants.SubscribeConfig, 220 ) -> tuple[str, str, int, int]: 221 """Sets up Wi-Fi Aware discovery sessions. 222 223 Args: 224 pub_config: The publish configuration. 225 sub_config: The subscribe configuration. 226 227 Returns: 228 A tuple of (publisher MAC address, subscriber MAC address, 229 publisher peer ID, subscriber peer ID). 230 """ 231 # Step 1: Attach Wi-Fi Aware sessions. 232 self.pub_attach_session, pub_mac_address = ( 233 aware_snippet_utils.start_attach( 234 self.publisher, pub_config.ranging_enabled 235 ) 236 ) 237 self.sub_attach_session, sub_mac_address = ( 238 aware_snippet_utils.start_attach( 239 self.subscriber, pub_config.ranging_enabled 240 ) 241 ) 242 243 # Step 2: Publisher publishes an Wi-Fi Aware service, subscriber 244 # subscribes to it. Wait for service discovery. 245 ( 246 self.pub_session, 247 pub_session_handler, 248 self.sub_session, 249 sub_session_handler, 250 sub_peer, 251 ) = aware_snippet_utils.publish_and_subscribe( 252 publisher=self.publisher, 253 pub_config=pub_config, 254 pub_attach_session=self.pub_attach_session, 255 subscriber=self.subscriber, 256 sub_config=sub_config, 257 sub_attach_session=self.sub_attach_session, 258 ) 259 260 # Step 3: Send messages through the discovery sessions. 261 msg = _MSG_SUB_TO_PUB.format(random_id=utils.rand_ascii_str(5)) 262 pub_peer = aware_snippet_utils.send_msg_through_discovery_session( 263 sender=self.subscriber, 264 sender_discovery_session_handler=sub_session_handler, 265 receiver=self.publisher, 266 receiver_discovery_session_handler=pub_session_handler, 267 discovery_session=self.sub_session, 268 peer_on_sender=sub_peer, 269 send_message=msg, 270 ) 271 self.subscriber.log.info( 272 'Sent a message to peer %d through discovery session.', 273 sub_peer, 274 ) 275 msg = _MSG_PUB_TO_SUB.format(random_id=utils.rand_ascii_str(5)) 276 aware_snippet_utils.send_msg_through_discovery_session( 277 sender=self.publisher, 278 sender_discovery_session_handler=pub_session_handler, 279 receiver=self.subscriber, 280 receiver_discovery_session_handler=sub_session_handler, 281 discovery_session=self.pub_session, 282 peer_on_sender=pub_peer, 283 send_message=msg, 284 ) 285 self.publisher.log.info( 286 'Sent a message to peer %d through discovery session.', 287 pub_peer, 288 ) 289 return (pub_mac_address, sub_mac_address, pub_peer, sub_peer) 290 291 def _perform_ranging( 292 self, 293 ad: android_device.AndroidDevice, 294 request: constants.RangingRequest, 295 ): 296 """Performs ranging and checks the ranging result. 297 298 Args: 299 ad: The Android device controller. 300 request: The ranging request. 301 """ 302 ad.log.debug('Starting ranging with request: %s', request) 303 ranging_cb_handler = ad.wifi.wifiAwareStartRanging(request.to_dict()) 304 event = ranging_cb_handler.waitAndGet( 305 event_name=constants.RangingResultCb.EVENT_NAME_ON_RANGING_RESULT, 306 timeout=_DEFAULT_TIMEOUT, 307 ) 308 309 callback_name = event.data.get( 310 constants.RangingResultCb.DATA_KEY_CALLBACK_NAME, None 311 ) 312 asserts.assert_equal( 313 callback_name, 314 constants.RangingResultCb.CB_METHOD_ON_RANGING_RESULT, 315 'Ranging failed: got unexpected callback.', 316 ) 317 318 results = event.data.get( 319 constants.RangingResultCb.DATA_KEY_RESULTS, None 320 ) 321 asserts.assert_true( 322 results is not None and len(results) == 1, 323 'Ranging got invalid results: null, empty, or wrong length.', 324 ) 325 326 status_code = results[0].get( 327 constants.RangingResultCb.DATA_KEY_RESULT_STATUS, None 328 ) 329 asserts.assert_equal( 330 status_code, 331 constants.RangingResultStatusCode.SUCCESS, 332 'Ranging peer failed: invalid result status code.', 333 ) 334 335 distance_mm = results[0].get( 336 constants.RangingResultCb.DATA_KEY_RESULT_DISTANCE_MM, None 337 ) 338 asserts.assert_true( 339 ( 340 distance_mm is not None 341 and distance_mm <= _LARGE_ENOUGH_DISTANCE_MM 342 ), 343 'Ranging peer failed: invalid distance in ranging result.', 344 ) 345 rssi = results[0].get( 346 constants.RangingResultCb.DATA_KEY_RESULT_RSSI, None 347 ) 348 asserts.assert_true( 349 rssi is not None and rssi >= _MIN_RSSI, 350 'Ranging peer failed: invalid rssi in ranging result.', 351 ) 352 353 peer_id = results[0].get( 354 constants.RangingResultCb.DATA_KEY_PEER_ID, None 355 ) 356 if peer_id is not None: 357 msg = 'Ranging peer failed: invalid peer ID in ranging result.' 358 asserts.assert_in(peer_id, request.peer_ids, msg) 359 360 peer_mac = results[0].get(constants.RangingResultCb.DATA_KEY_MAC, None) 361 if peer_mac is not None: 362 msg = ( 363 'Ranging peer failed: invalid peer MAC address in ranging ' 364 'result.' 365 ) 366 asserts.assert_in(peer_mac, request.peer_mac_addresses, msg) 367 368 def teardown_test(self): 369 utils.concurrent_exec( 370 self._teardown_on_device, 371 ((self.publisher,), (self.subscriber,)), 372 max_workers=2, 373 raise_on_exception=True, 374 ) 375 self.pub_session = None 376 self.sub_session = None 377 self.pub_attach_session = None 378 self.sub_attach_session = None 379 380 def _teardown_on_device(self, ad: android_device.AndroidDevice) -> None: 381 """Releases resources and sessions after each test.""" 382 try: 383 ad.wifi.connectivityReleaseAllSockets() 384 ad.wifi.wifiAwareCloseAllWifiAwareSession() 385 finally: 386 ad.services.create_output_excerpts_all(self.current_test_info) 387 388 def on_fail(self, record: records.TestResult) -> None: 389 logging.info('Collecting bugreports...') 390 android_device.take_bug_reports( 391 self.ads, destination=self.current_test_info.output_path 392 ) 393 394 395if __name__ == '__main__': 396 # Take test args 397 if '--' in sys.argv: 398 index = sys.argv.index('--') 399 sys.argv = sys.argv[:1] + sys.argv[index + 1 :] 400 401 test_runner.main() 402