• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import copy
2import dataclasses
3from enum import IntEnum, StrEnum
4from typing import Set, Dict
5from uuid import uuid4
6from lib.params import RangingPreference
7from mobly.controllers.android_device import AndroidDevice
8from mobly.controllers.android_device_lib.callback_handler_v2 import (
9    CallbackHandlerV2,
10)
11from mobly.snippet.callback_event import CallbackEvent
12
13
14CALLBACK_WAIT_TIME_SEC = 5.0
15
16
17class RangingTechnology(IntEnum):
18  UWB = 0
19  BLE_CS = 1
20  WIFI_RTT = 2
21  BLE_RSSI = 3
22
23
24class Event(StrEnum):
25  OPENED = "OPENED"
26  OPEN_FAILED = "OPEN_FAILED"
27  STARTED = "STARTED"
28  DATA = "DATA"
29  STOPPED = "STOPPED"
30  CLOSED = "CLOSED"
31  OOB_SEND_CAPABILITIES_REQUEST = "OOB_SEND_CAPABILITIES_REQUEST",
32  OOB_SEND_CAPABILITIES_RESPONSE = "OOB_SEND_CAPABILITIES_RESPONSE",
33  OOB_SEND_SET_CONFIGURATION = "OOB_SEND_SET_CONFIGURATION",
34  OOB_SEND_STOP_RANGING = "OOB_SEND_STOP_RANGING",
35  OOB_SEND_UNKNOWN = "OOB_SEND_UNKNOWN",
36  OOB_CLOSED = "OOB_CLOSED"
37
38
39class RangingDecorator:
40
41  def __init__(self, ad: AndroidDevice):
42    """Initialize the ranging device.
43
44    Args:
45        ad: android device object
46    """
47    self.id = str(uuid4())
48    self.ad = ad
49    self._callback_events: Dict[str, CallbackHandlerV2] = {}
50    self.log = self.ad.log
51    self.uwb_address = None
52    self.bt_addr = None
53
54  def start_ranging(
55      self, session_handle: str, preference: RangingPreference
56  ):
57    """Start ranging with the specified preference. """
58    handler = self.ad.ranging.startRanging(
59        session_handle, dataclasses.asdict(preference)
60    )
61    self._callback_events[session_handle] = handler
62
63  def start_ranging_and_assert_opened(
64      self, session_handle: str, preference: RangingPreference
65  ):
66    """Start ranging with the specified preference and wait for onStarted event.
67
68    Throws:
69      CallbackHandlerTimeoutError if ranging does not successfully start.
70    """
71    self.start_ranging(session_handle, preference)
72    self.assert_ranging_event_received(session_handle, Event.OPENED)
73
74  def is_ranging_technology_supported(
75      self, ranging_technology: RangingTechnology
76  ) -> bool:
77    """Checks whether a specific ranging technology is supported by the device"""
78    return self.ad.ranging.isTechnologySupported(ranging_technology)
79
80  def stop_ranging(self, session_handle: str):
81    self.ad.ranging.stopRanging(session_handle)
82
83  def assert_closed(self, session_handle: str):
84    self.assert_close_ranging_event_received(session_handle)
85    self._callback_events.pop(session_handle)
86
87  def stop_ranging_and_assert_closed(self, session_handle: str):
88    """Stop ranging and wait for onStopped event.
89
90    Throws:
91      CallbackHandlerTimeoutError if ranging was not successfully stopped.
92    """
93    self.stop_ranging(session_handle)
94    self.assert_closed(session_handle)
95
96  def assert_close_ranging_event_received(self, session_handle: str):
97    self.assert_ranging_event_received(session_handle, Event.CLOSED)
98
99  def assert_ranging_event_received(
100      self,
101      session_handle: str,
102      event: Event,
103      timeout_s: float = CALLBACK_WAIT_TIME_SEC,
104  ) -> CallbackEvent:
105    """Asserts that the expected event is received before a timeout.
106
107    Args:
108      session_handle: identifier for the ranging session.
109      event: expected ranging event.
110      timeout_s: timeout in seconds.
111
112    Returns:
113      The event
114
115    Throws:
116      CallbackHandlerTimeoutError if the expected event was not received.
117    """
118    handler = self._callback_events[session_handle]
119    return handler.waitAndGet(event, timeout=timeout_s)
120
121  def verify_received_data_from_peer_using_technologies(
122      self,
123      session_handle: str,
124      peer_id: str,
125      technologies: Set[RangingTechnology],
126      timeout_s: float = CALLBACK_WAIT_TIME_SEC,
127  ) -> bool:
128    """Verifies that the peer sends us data from all provided technologies before a timeout.
129
130    Args:
131      session_handle: unique identifier for the ranging session.
132      peer_id: UUID of the peer.
133      technologies: set of ranging technologies over which we want to receive
134        data.
135      timeout_s: timeout in seconds.
136
137    Returns:
138        True if peer was found, False otherwise
139    """
140    remaining_technologies = copy.deepcopy(technologies)
141
142    def predicate(event):
143      technology = event.data["technology"]
144
145      if event.data["peer_id"] == peer_id and technology in copy.deepcopy(
146          remaining_technologies
147      ):
148        remaining_technologies.remove(technology)
149
150      return not bool(remaining_technologies)
151
152    handler = self._callback_events[session_handle]
153    try:
154      handler.waitForEvent(Event.DATA, predicate, timeout=timeout_s)
155      return True
156    except Exception:
157      return False
158
159  def verify_received_data_from_peer(
160      self,
161      session_handle: str,
162      peer_id: str,
163      timeout_s: float = CALLBACK_WAIT_TIME_SEC,
164  ) -> bool:
165    """Verifies that the peer sends us data using any technology before a timeout.
166
167    Args:
168      session_handle: unique identifier for the ranging session.
169      peer_id: UUID of the peer.
170      timeout_s: timeout in seconds.
171
172    Returns:
173        True if peer received data, False otherwise
174    """
175
176    def predicate(event):
177      return event.data["peer_id"] == peer_id
178
179    handler = self._callback_events[session_handle]
180    try:
181      handler.waitForEvent(Event.DATA, predicate, timeout=timeout_s)
182      return True
183    except Exception:
184      return False
185
186  def clear_ranging_sessions(self):
187    """Stop all ranging sessions and clear callback events"""
188    for session_handle in self._callback_events.keys():
189      self.ad.ranging.stopRanging(session_handle)
190
191    self._callback_events.clear()
192
193  def get_callback_handler(self, session_handle: str) -> CallbackHandlerV2:
194    """Get the mobly CallbackHandler associated with the provided session"""
195    return self._callback_events.get(session_handle, None)
196
197