1import copy 2import dataclasses 3from enum import IntEnum, StrEnum 4from typing import Set, Dict 5from uuid import uuid4 6from lib.params import RangingPreference 7from mobly.controllers.android_device import AndroidDevice 8from mobly.controllers.android_device_lib.callback_handler_v2 import ( 9 CallbackHandlerV2, 10) 11from mobly.snippet.callback_event import CallbackEvent 12 13 14CALLBACK_WAIT_TIME_SEC = 5.0 15 16 17class RangingTechnology(IntEnum): 18 UWB = 0 19 BLE_CS = 1 20 WIFI_RTT = 2 21 BLE_RSSI = 3 22 23 24class Event(StrEnum): 25 OPENED = "OPENED" 26 OPEN_FAILED = "OPEN_FAILED" 27 STARTED = "STARTED" 28 DATA = "DATA" 29 STOPPED = "STOPPED" 30 CLOSED = "CLOSED" 31 OOB_SEND_CAPABILITIES_REQUEST = "OOB_SEND_CAPABILITIES_REQUEST", 32 OOB_SEND_CAPABILITIES_RESPONSE = "OOB_SEND_CAPABILITIES_RESPONSE", 33 OOB_SEND_SET_CONFIGURATION = "OOB_SEND_SET_CONFIGURATION", 34 OOB_SEND_STOP_RANGING = "OOB_SEND_STOP_RANGING", 35 OOB_SEND_UNKNOWN = "OOB_SEND_UNKNOWN", 36 OOB_CLOSED = "OOB_CLOSED" 37 38 39class RangingDecorator: 40 41 def __init__(self, ad: AndroidDevice): 42 """Initialize the ranging device. 43 44 Args: 45 ad: android device object 46 """ 47 self.id = str(uuid4()) 48 self.ad = ad 49 self._callback_events: Dict[str, CallbackHandlerV2] = {} 50 self.log = self.ad.log 51 self.uwb_address = None 52 self.bt_addr = None 53 54 def start_ranging( 55 self, session_handle: str, preference: RangingPreference 56 ): 57 """Start ranging with the specified preference. """ 58 handler = self.ad.ranging.startRanging( 59 session_handle, dataclasses.asdict(preference) 60 ) 61 self._callback_events[session_handle] = handler 62 63 def start_ranging_and_assert_opened( 64 self, session_handle: str, preference: RangingPreference 65 ): 66 """Start ranging with the specified preference and wait for onStarted event. 67 68 Throws: 69 CallbackHandlerTimeoutError if ranging does not successfully start. 70 """ 71 self.start_ranging(session_handle, preference) 72 self.assert_ranging_event_received(session_handle, Event.OPENED) 73 74 def is_ranging_technology_supported( 75 self, ranging_technology: RangingTechnology 76 ) -> bool: 77 """Checks whether a specific ranging technology is supported by the device""" 78 return self.ad.ranging.isTechnologySupported(ranging_technology) 79 80 def stop_ranging(self, session_handle: str): 81 self.ad.ranging.stopRanging(session_handle) 82 83 def assert_closed(self, session_handle: str): 84 self.assert_close_ranging_event_received(session_handle) 85 self._callback_events.pop(session_handle) 86 87 def stop_ranging_and_assert_closed(self, session_handle: str): 88 """Stop ranging and wait for onStopped event. 89 90 Throws: 91 CallbackHandlerTimeoutError if ranging was not successfully stopped. 92 """ 93 self.stop_ranging(session_handle) 94 self.assert_closed(session_handle) 95 96 def assert_close_ranging_event_received(self, session_handle: str): 97 self.assert_ranging_event_received(session_handle, Event.CLOSED) 98 99 def assert_ranging_event_received( 100 self, 101 session_handle: str, 102 event: Event, 103 timeout_s: float = CALLBACK_WAIT_TIME_SEC, 104 ) -> CallbackEvent: 105 """Asserts that the expected event is received before a timeout. 106 107 Args: 108 session_handle: identifier for the ranging session. 109 event: expected ranging event. 110 timeout_s: timeout in seconds. 111 112 Returns: 113 The event 114 115 Throws: 116 CallbackHandlerTimeoutError if the expected event was not received. 117 """ 118 handler = self._callback_events[session_handle] 119 return handler.waitAndGet(event, timeout=timeout_s) 120 121 def verify_received_data_from_peer_using_technologies( 122 self, 123 session_handle: str, 124 peer_id: str, 125 technologies: Set[RangingTechnology], 126 timeout_s: float = CALLBACK_WAIT_TIME_SEC, 127 ) -> bool: 128 """Verifies that the peer sends us data from all provided technologies before a timeout. 129 130 Args: 131 session_handle: unique identifier for the ranging session. 132 peer_id: UUID of the peer. 133 technologies: set of ranging technologies over which we want to receive 134 data. 135 timeout_s: timeout in seconds. 136 137 Returns: 138 True if peer was found, False otherwise 139 """ 140 remaining_technologies = copy.deepcopy(technologies) 141 142 def predicate(event): 143 technology = event.data["technology"] 144 145 if event.data["peer_id"] == peer_id and technology in copy.deepcopy( 146 remaining_technologies 147 ): 148 remaining_technologies.remove(technology) 149 150 return not bool(remaining_technologies) 151 152 handler = self._callback_events[session_handle] 153 try: 154 handler.waitForEvent(Event.DATA, predicate, timeout=timeout_s) 155 return True 156 except Exception: 157 return False 158 159 def verify_received_data_from_peer( 160 self, 161 session_handle: str, 162 peer_id: str, 163 timeout_s: float = CALLBACK_WAIT_TIME_SEC, 164 ) -> bool: 165 """Verifies that the peer sends us data using any technology before a timeout. 166 167 Args: 168 session_handle: unique identifier for the ranging session. 169 peer_id: UUID of the peer. 170 timeout_s: timeout in seconds. 171 172 Returns: 173 True if peer received data, False otherwise 174 """ 175 176 def predicate(event): 177 return event.data["peer_id"] == peer_id 178 179 handler = self._callback_events[session_handle] 180 try: 181 handler.waitForEvent(Event.DATA, predicate, timeout=timeout_s) 182 return True 183 except Exception: 184 return False 185 186 def clear_ranging_sessions(self): 187 """Stop all ranging sessions and clear callback events""" 188 for session_handle in self._callback_events.keys(): 189 self.ad.ranging.stopRanging(session_handle) 190 191 self._callback_events.clear() 192 193 def get_callback_handler(self, session_handle: str) -> CallbackHandlerV2: 194 """Get the mobly CallbackHandler associated with the provided session""" 195 return self._callback_events.get(session_handle, None) 196 197