# Copyright 2020 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Server side bluetooth tests on Advertisement Monitor API""" import time import logging import array from autotest_lib.client.bin import utils from autotest_lib.server.cros.bluetooth import bluetooth_adapter_tests from autotest_lib.client.common_lib import error class TestMonitor(): """Local object hosting the test values for Advertisement Monitor object. This class holds the values of parameters for creating an Advertisement Monitor object. """ # Index of the pattern data in the patterns filter. PATTERN_DATA_IDX = 2 def __init__(self, app_id): """Construction of a local monitor object. @param app_id: the app id associated with the monitor. """ self.type = None self.rssi = [] self.patterns = [] self.monitor_id = None self.app_id = app_id def _bytes(self, str_data): """Convert string data to byte array. @param str_data: the string data. @returns: the byte array. """ return [b for b in array.array('B', str_data)] def update_type(self, monitor_type): """Update the monitor type. @param monitor_type: type of the monitor. """ self.type = monitor_type def update_rssi(self, monitor_rssi): """Update the RSSI filter values. @param rssi: the list of rssi threshold and timeout values. """ self.rssi = monitor_rssi def update_patterns(self, monitor_patterns): """Update the content filter patterns. @param patterns: the list of start position, ad type and patterns. """ # Convert string patterns to byte array, if any. for pattern in monitor_patterns: if isinstance(pattern[self.PATTERN_DATA_IDX], str): pattern[self.PATTERN_DATA_IDX] = self._bytes( pattern[self.PATTERN_DATA_IDX]) self.patterns = monitor_patterns def update_monitor_id(self, monitor_id): """Store the monitor id returned by add_monitor(). @param monitor_id: the monitor id. """ self.monitor_id = monitor_id def get_monitor_data(self): """Return the monitor parameters. @returns: List containing the monitor data. """ return [self.type, self.rssi, self.patterns] def get_monitor_id(self): """Return the monitor id. @returns: monitor id if monitor is already added, None otherwise. """ return self.monitor_id def get_app_id(self): """Return the application id. @returns: app id associated to the monitor object. """ return self.app_id class BluetoothAdapterAdvMonitorTests( bluetooth_adapter_tests.BluetoothAdapterTests): """Server side bluetooth adapter advertising Test. This class comprises a number of test cases to verify bluetooth Advertisement Monitor API. Refer to the test plan doc for more details: go/bt-advmon-api-test-plan """ ADD_MONITOR_POLLING_TIMEOUT_SECS = 3 ADD_MONITOR_POLLING_SLEEP_SECS = 1 PAIR_TEST_SLEEP_SECS = 5 # Non-zero count value is used to indicate the case where multiple # DeviceFound/DeviceLost events are expected to occur. MULTIPLE_EVENTS = -1 # Number of cycle to observe during a test INTERLEAVE_SCAN_TEST_CYCLE = 10 # Acceptable extra delay of interleave scan duration, in sec INTERLEAVE_SCAN_DURATION_TOLERANCE = 0.1 # Acceptable delay of cancelling interleave scan, in sec INTERLEAVE_SCAN_CANCEL_TOLERANCE = 2 # Acceptable extra/missing cycles in interleave scan INTERLEAVE_SCAN_CYCLE_NUM_TOLERANCE = 2 # Duration of kernel perform 'start discovery', in sec DISCOVERY_DURATION = 10.24 test_case_log = bluetooth_adapter_tests.test_case_log test_retry_and_log = bluetooth_adapter_tests.test_retry_and_log def advmon_check_manager_interface_exist(self): """Check if AdvertisementMonitorManager1 interface is available. @returns: True if Manager interface is available, False otherwise. """ return self.bluetooth_facade.advmon_check_manager_interface_exist() def read_supported_types(self): """Read the Advertisement Monitor supported monitor types. @returns: List of supported advertisement monitor types. """ return self.bluetooth_facade.advmon_read_supported_types() def read_supported_features(self): """Read the Advertisement Monitor supported features. @returns: List of supported advertisement monitor features. """ return self.bluetooth_facade.advmon_read_supported_features() def create_app(self): """Create an advertisement monitor app. @returns: app id, once the app is created. """ return self.bluetooth_facade.advmon_create_app() def exit_app(self, app_id): """Exit an advertisement monitor app. @param app_id: the app id. @returns: True on success, False otherwise. """ return self.bluetooth_facade.advmon_exit_app(app_id) def kill_app(self, app_id): """Kill an advertisement monitor app by sending SIGKILL. @param app_id: the app id. @returns: True on success, False otherwise. """ return self.bluetooth_facade.advmon_kill_app(app_id) def register_app(self, app_id): """Register an advertisement monitor app. @param app_id: the app id. @returns: True on success, False otherwise. """ return self.bluetooth_facade.advmon_register_app(app_id) def unregister_app(self, app_id): """Unregister an advertisement monitor app. @param app_id: the app id. @returns: True on success, False otherwise. """ return self.bluetooth_facade.advmon_unregister_app(app_id) def add_monitor(self, app_id, monitor_data): """Create an Advertisement Monitor object. @param app_id: the app id. @param monitor_data: the list containing monitor type, RSSI filter values and patterns. @returns: monitor id, once the monitor is created, None otherwise. """ return self.bluetooth_facade.advmon_add_monitor(app_id, monitor_data) def remove_monitor(self, app_id, monitor_id): """Remove the Advertisement Monitor object. @param app_id: the app id. @param monitor_id: the monitor id. @returns: True on success, False otherwise. """ return self.bluetooth_facade.advmon_remove_monitor(app_id, monitor_id) def get_event_count(self, app_id, monitor_id, event='All'): """Read the count of a particular event on the given monitor. @param app_id: the app id. @param monitor_id: the monitor id. @param event: name of the specific event or 'All' for all events. @returns: count of the specific event or dict of counts of all events. """ return self.bluetooth_facade.advmon_get_event_count(app_id, monitor_id, event) def reset_event_count(self, app_id, monitor_id, event='All'): """Reset the count of a particular event on the given monitor. @param app_id: the app id. @param monitor_id: the monitor id. @param event: name of the specific event or 'All' for all events. @returns: True on success, False otherwise. """ return self.bluetooth_facade.advmon_reset_event_count(app_id, monitor_id, event) def interleave_logger_start(self): """ Start interleave logger recording """ self.bluetooth_facade.advmon_interleave_scan_logger_start() def interleave_logger_stop(self): """ Stop interleave logger recording @returns: True if logs were successfully collected, False otherwise. """ return self.bluetooth_facade.advmon_interleave_scan_logger_stop() def interleave_logger_get_records(self): """ Get records in previous log collections @returns: a list of records, where each item is a record of interleave |state| and the |time| the state starts. |state| could be {'no filter', 'allowlist'} |time| is system time in sec """ return self.bluetooth_facade.\ advmon_interleave_scan_logger_get_records() def interleave_logger_get_cancel_event(self): """ Get cancel event in previous log collections @returns: the first cancel event in the collections, None if no cancel event was found """ events = self.bluetooth_facade.\ advmon_interleave_scan_logger_get_cancel_events() if len(events) == 0: event = None else: event = events[0] if len(events) > 1: logging.warning('More than one cancel events found %s', events) return event @test_retry_and_log(False) def test_supported_types(self): """Test supported monitor types. @returns: True on success, False otherwise. """ supported_types = self.read_supported_types() for supported_type in supported_types: logging.info('type: %s', supported_type) # TODO(b/169658213) - add check for supported types. return True @test_retry_and_log(False) def test_supported_features(self): """Test supported features. @returns: True on success, False otherwise. """ supported_features = self.read_supported_features() for supported_feature in supported_features: logging.info('feature: %s', supported_feature) # TODO(b/169658213) - add check for supported features. return True def test_is_controller_offloading_supported(self): """Check if controller based RSSI filtering is supported. By default the LE_SCAN_FILTER_DUP flag is enabled on all platforms. Due to this, the host does not receive as many advertisements during passive scanning, which causes SW based RSSI filtering not to work as intended. So, if the controller offloading is not supported, skip the tests that involves RSSI filtering and raise TEST_NA. @raises: TestNA if controller based RSSI filtering is not supported. """ supported_features = self.read_supported_features() if not supported_features: logging.info('Controller offloading not supported') raise error.TestNAError('Controller offloading not supported') def test_is_adv_monitoring_supported(self, require_rssi_filtering = False): """Check if Adv Monitor API is supported. If AdvMonitor API is not supported by the platform, AdvertisementMonitorManager1 interface won't be exposed by bluetoothd. In such case, skip the test and raise TestNA. @param require_rssi_filtering: True if test requires RSSI filtering. @raises: TestNA if Adv Monitor API is not supported or if controller based RSSI filtering is not supported. """ if not self.advmon_check_manager_interface_exist(): logging.info('Advertisement Monitor API not supported') raise error.TestNAError('Advertisement Monitor API not supported') if require_rssi_filtering: self.test_is_controller_offloading_supported() @test_retry_and_log(False) def test_exit_app(self, app_id): """Test exit application. @param app_id: the app id. @returns: True on success, False otherwise. """ return self.exit_app(app_id) @test_retry_and_log(False) def test_kill_app(self, app_id): """Test kill application. @param app_id: the app id. @returns: True on success, False otherwise. """ return self.kill_app(app_id) @test_retry_and_log(False) def test_register_app(self, app_id, expected=True): """Test register application. @param app_id: the app id. @param expected: expected result of the RegisterMonitor method. @returns: True on success, False otherwise. """ return self.register_app(app_id) == expected @test_retry_and_log(False) def test_unregister_app(self, app_id, expected=True): """Test unregister application. @param app_id: the app id. @param expected: expected result of the UnregisterMonitor method. @returns: True on success, False otherwise. """ return self.unregister_app(app_id) == expected @test_retry_and_log(False) def test_monitor_activate(self, monitor, expected): """Test if the Activate method on the monitor has been invoked or not. @param monitor: the local monitor object. @param expected: expected state of the Activate event. @returns: True on success, False otherwise. """ app_id = monitor.get_app_id() monitor_id = monitor.get_monitor_id() if monitor_id is None: return False def _check_activate(): """Handler for the activate event.""" return self.get_event_count(app_id, monitor_id, 'Activate') == 1 activated = False try: utils.poll_for_condition( condition=_check_activate, timeout=self.ADD_MONITOR_POLLING_TIMEOUT_SECS, sleep_interval=self.ADD_MONITOR_POLLING_SLEEP_SECS, desc='Waiting for activate') activated = True except utils.TimeoutError as e: logging.error('activate: %s', e) except: logging.error('activate: unexpected error') return expected == activated @test_retry_and_log(False) def test_monitor_release(self, monitor, expected): """Test if the Release method on the monitor has been invoked or not. @param monitor: the local monitor object. @param expected: expected state of the Release event. @returns: True on success, False otherwise. """ app_id = monitor.get_app_id() monitor_id = monitor.get_monitor_id() if monitor_id is None: return False def _check_release(): """Handler for the release event.""" return self.get_event_count(app_id, monitor_id, 'Release') == 1 released = False try: utils.poll_for_condition( condition=_check_release, timeout=self.ADD_MONITOR_POLLING_TIMEOUT_SECS, sleep_interval=self.ADD_MONITOR_POLLING_SLEEP_SECS, desc='Waiting for release') released = True except utils.TimeoutError as e: logging.error('release: %s', e) except Exception as e: logging.error('release: %s', e) except: logging.error('release: unexpected error') return expected == released @test_retry_and_log(True) def test_device_found(self, monitor, count, delay=0): """Test if the DeviceFound method on a monitor has been invoked or not. @param monitor: the local monitor object. @param count: expected count of the DeviceFound events. @param delay: wait until 'delay' seconds before reading the event count. @returns: True on success, False otherwise. """ app_id = monitor.get_app_id() monitor_id = monitor.get_monitor_id() if monitor_id is None: return False if delay: time.sleep(delay) checked_count = self.get_event_count(app_id, monitor_id, 'DeviceFound') if count == self.MULTIPLE_EVENTS: return checked_count > 0 return checked_count == count @test_retry_and_log(False) def test_device_lost(self, monitor, count, delay=0): """Test if the DeviceLost method on a monitor has been invoked or not. @param monitor: the local monitor object. @param count: expected count of the DeviceLost events. @param delay: wait until 'delay' seconds before reading the event count. @returns: True on success, False otherwise. """ app_id = monitor.get_app_id() monitor_id = monitor.get_monitor_id() if monitor_id is None: return False if delay: time.sleep(delay) checked_count = self.get_event_count(app_id, monitor_id, 'DeviceLost') if count == self.MULTIPLE_EVENTS: return checked_count > 1 return checked_count == count @test_retry_and_log(False) def test_reset_event_count(self, monitor, event='All'): """Test resetting count of a particular event on the given monitor. @param monitor: the local monitor object. @param event: name of the specific event or 'All' for all events. @returns: True on success, False otherwise. """ return self.reset_event_count(monitor.get_app_id(), monitor.get_monitor_id(), event) @test_retry_and_log(False) def test_add_monitor(self, monitor, expected_activate=None, expected_release=None): """Test adding a monitor. @param monitor: the local monitor object. @param expected_activate: expected state of the Activate event. @param expected_release: expected state of the Release event. @returns: True on success, False otherwise. """ app_id = monitor.get_app_id() monitor_id = self.add_monitor(app_id, monitor.get_monitor_data()) if monitor_id is None: return False monitor.update_monitor_id(monitor_id) checked_activate = True if expected_activate is not None: checked_activate = self.test_monitor_activate( monitor, expected_activate) checked_release = True if expected_release is not None: checked_release = self.test_monitor_release( monitor, expected_release) if self.get_event_count(app_id, monitor_id, 'Release') != 0: self.remove_monitor(app_id, monitor_id) monitor.update_monitor_id(None) self.results = { 'activated': checked_activate, 'released': checked_release } return all(self.results.values()) @test_retry_and_log(False) def test_remove_monitor(self, monitor): """Test removing a monitor. @param monitor: the local monitor object. @returns: True on success, False otherwise. """ app_id = monitor.get_app_id() monitor_id = monitor.get_monitor_id() if monitor_id is None: return False ret = self.remove_monitor(app_id, monitor_id) monitor.update_monitor_id(None) if ret is None: return False return True @test_retry_and_log(False) def test_setup_peer_devices(self): """Test availability of the peer devices. @returns: True on success, False otherwise. """ self.peer_keybd = None self.peer_mouse = None for device_type, device_list in self.devices.items(): for device in device_list: if device_type is 'BLE_KEYBOARD': self.peer_keybd = device elif device_type is 'BLE_MOUSE': self.peer_mouse = device if self.peer_keybd is not None and self.peer_mouse is not None: self.test_stop_peer_device_adv(self.peer_keybd) self.test_stop_peer_device_adv(self.peer_mouse) self.results = { 'keybd': self.peer_keybd is not None, 'mouse': self.peer_mouse is not None } return all(self.results.values()) @test_retry_and_log(False) def test_start_peer_device_adv(self, device, duration=0): """Test enabling the peer device advertisements. @param device: the device object. @param duration: the duration of the advertisement. @returns: True on success, False otherwise. """ ret = self.test_device_set_discoverable(device, True) if duration: time.sleep(duration) return ret @test_retry_and_log(False) def test_stop_peer_device_adv(self, device, duration=0): """Test disabling the peer device advertisements. @param device: the device object. @param duration: the duration of the advertisement disable. @returns: True on success, False otherwise. """ ret = self.test_device_set_discoverable(device, False) if duration: time.sleep(duration) return ret def check_records_interleaving(self, durations, records, expect_cycles): """ Check the state of records is interleaving and also the duration is as expected. @param durations: a dict of {'allowlist': allowlist_duration, 'no filter': no_filter_duration} @param records: a list of records @returns: a dict of {'Interleaved': record_state_is_interleaved, 'Span within range': duration_is_expected} """ actual_cycle = len(records) / len(durations.keys()) offset = self.INTERLEAVE_SCAN_CYCLE_NUM_TOLERANCE expect_cycle_lowerbound = max(1, expect_cycles - offset) expect_cycle_upperbound = expect_cycles + offset enough_cycle_num = (actual_cycle >= expect_cycle_lowerbound and actual_cycle <= expect_cycle_upperbound) interleaved = True span_within_range = True expected_state = None def _next_state(state): if state == 'allowlist': return 'no filter' elif state == 'no filter': return 'allowlist' else: logging.warning('Unexpected state %s', state) return None for i, record in enumerate(records): state = record['state'] nstate = _next_state(state) # We can't count span on single data point and expected_state # hasn't set if i != 0: span = (record['time'] - records[i - 1]['time']) if state != expected_state: interleaved = False if span < durations[nstate] -\ self.INTERLEAVE_SCAN_DURATION_TOLERANCE: span_within_range = False if span > durations[nstate] +\ self.INTERLEAVE_SCAN_DURATION_TOLERANCE: span_within_range = False expected_state = nstate return { 'Enough cycle number': enough_cycle_num, 'Interleaved': interleaved, 'Span within range': span_within_range } def check_records_paused(self, records, cancel_event, expect_paused_time, expect_resume): """ Check if the interleave scan is paused @param records: a list of records @param cancel_event: the timestamp interleave was canceled @param expect_paused_time: minimum duration of interleave scan paused @param expect_resume: True if interleave scan should restart, False if ***we don't care*** @returns: a dict of {'Cancel event': (bool), 'Non-empty records before paused': (bool), 'Non-empty records after paused': (bool), 'Paused enough time': (bool) } Note: some entries might not exist if it doesn't make sense in that case. """ result = {} result.update({'Cancel event': cancel_event is not None}) if cancel_event is None: return result canceled_time = cancel_event + self.INTERLEAVE_SCAN_CANCEL_TOLERANCE before_paused_rec = [r for r in records if r['time'] < canceled_time] after_paused_rec = [r for r in records if r['time'] >= canceled_time] result.update({ 'Non-empty records before paused': len(before_paused_rec) != 0 }) if expect_resume: result.update({ 'Non-empty records after paused': len(after_paused_rec) != 0 }) if len(before_paused_rec) > 0 and len(after_paused_rec) > 0: # Records are stored chronologically. last_time_before_paused = before_paused_rec[-1]['time'] first_time_after_paused = after_paused_rec[0]['time'] paused_time = first_time_after_paused - last_time_before_paused result.update( {'Paused enough time': paused_time >= expect_paused_time}) return result def get_interleave_scan_durations(self): """ Get interleave scan duration. @returns: a dict of {'allowlist': allowlist_duration, 'no filter': no_filter_duration} """ # TODO(b/171844106): get this parameters via # MGMT_OP_READ_DEF_SYSTEM_CONFIG durations = {'allowlist': 300, 'no filter': 500} # Change the unit from msec to second for future convenience. durations = {key: value * 0.001 for key, value in durations.items()} return durations @test_retry_and_log(False) def test_interleaving_state(self, expect_true, cycles=INTERLEAVE_SCAN_TEST_CYCLE): """ Test for checking if kernel is doing interleave scan or not. @params expect_true: True if kernel should be running interleave scan False if kernel shouldn't. @params cycles: number of cycles to collect logs @returns: True on success, False otherwise. """ durations = self.get_interleave_scan_durations() interleave_period = sum(durations.values()) log_time = interleave_period * cycles self.interleave_logger_start() time.sleep(log_time) self.interleave_logger_stop() records = self.interleave_logger_get_records() logging.debug(records) if not expect_true: self.results = {'No records': len(records) == 0} else: self.results = self.check_records_interleaving( durations, records, cycles) return all(self.results.values()) @test_retry_and_log(False) def test_interleaving_suspend_resume(self): """ Test for checking if kernel paused interleave scan during system suspended. @returns: True on success, False otherwise. """ durations = self.get_interleave_scan_durations() interleave_period = sum(durations.values()) # make sure suspend time is long enough to verify there is no # interleaving during suspended expect_suspend_time = max(self.SUSPEND_TIME_SECS, 2 * interleave_period) # make sure we'll get some records before/after system suspended extra_sleep_time = 2 * interleave_period self.interleave_logger_start() time.sleep(extra_sleep_time) self.suspend_resume(suspend_time=expect_suspend_time) time.sleep(extra_sleep_time) self.interleave_logger_stop() records = self.interleave_logger_get_records() cancel_event = self.interleave_logger_get_cancel_event() logging.debug(records) logging.debug(cancel_event) # Currently resume time is not very reliable. It is likely the actual # time in sleeping is less than expect_suspend_time. # Check the interleave scan paused for at least one cycle long instead. self.results = self.check_records_paused(records, cancel_event, interleave_period, True) return all(self.results.values()) @test_retry_and_log(False) def test_interleaving_active_scan_cycle(self): """ Test for checking if kernel paused interleave scan during active scan. @returns: True on success, False otherwise. """ durations = self.get_interleave_scan_durations() interleave_period = sum(durations.values()) # make sure we'll get some records before/after active scan extra_sleep_time = 2 * interleave_period self.interleave_logger_start() time.sleep(extra_sleep_time) self.test_start_discovery() time.sleep(extra_sleep_time + self.INTERLEAVE_SCAN_CANCEL_TOLERANCE) self.interleave_logger_stop() records = self.interleave_logger_get_records() cancel_event = self.interleave_logger_get_cancel_event() logging.debug(records) logging.debug(cancel_event) # BlueZ pauses discovery for every DISCOVERY_DURATION then restarts it # 5 seconds later. Interleave scan also get restarted during the paused # time. self.results = self.check_records_paused(records, cancel_event, self.DISCOVERY_DURATION, False) self.test_stop_discovery() return all(self.results.values()) def advmon_test_monitor_creation(self): """Test case: MONITOR_CREATION Validate register/unregister app and create/remove monitor. """ self.test_is_adv_monitoring_supported() # Create a test app instance. app1 = self.create_app() monitor1 = TestMonitor(app1) monitor1.update_type('or_patterns') monitor1.update_rssi([-40, 5, -60, 5]) monitor1.update_patterns([ [0, 0x19, [0xc2, 0x03]], ]) monitor2 = TestMonitor(app1) monitor2.update_type('or_patterns') monitor2.update_rssi([-40, 10, -60, 10]) monitor2.update_patterns([ [0, 0x03, [0x12, 0x18]], ]) # Read supported types and features, should not fail. self.test_supported_types() self.test_supported_features() # Activate/Release should not get called. self.test_add_monitor(monitor1, expected_activate=False, expected_release=False) # Register the app, should not fail. self.test_register_app(app1) # Already registered app path, should fail with AlreadyExists. self.test_register_app(app1, expected=False) # Activate should get called for the monitor added before register app. self.test_monitor_activate(monitor1, expected=True) # Correct monitor parameters, activate should get called. self.test_add_monitor(monitor2, expected_activate=True) # Remove a monitor, should not fail. self.test_remove_monitor(monitor1) # Unregister the app, should not fail. self.test_unregister_app(app1) # Already unregistered app path, should fail with DoesNotExists. self.test_unregister_app(app1, expected=False) # Release should get called for a monitor not removed before unregister. self.test_monitor_release(monitor2, expected=True) # Remove another monitor, should not fail. self.test_remove_monitor(monitor2) # Terminate the test app instance. self.test_exit_app(app1) def advmon_test_monitor_validity(self): """Test case: MONITOR_VALIDITY Validate monitor parameters - monitor type, patterns, RSSI filter values. """ self.test_is_adv_monitoring_supported() # Create a test app instance. app1 = self.create_app() monitor1 = TestMonitor(app1) monitor1.update_type('incorrect_pattern') monitor1.update_rssi([-40, 5, -60, 5]) monitor1.update_patterns([ [0, 0x19, [0xc2, 0x03]], ]) monitor2 = TestMonitor(app1) monitor2.update_type('or_patterns') monitor2.update_rssi([-40, 10, -60, 10]) monitor2.update_patterns([ [0, 0x03, [0x12, 0x18]], ]) # Register the app, should not fail. self.test_register_app(app1) # Incorrect monitor type, release should get called. self.test_add_monitor(monitor1, expected_release=True) # Incorrect rssi parameters, release should get called. monitor2.update_rssi([-40, 0, -60, 10]) self.test_add_monitor(monitor2, expected_release=True) monitor2.update_rssi([-40, 10, -60, 0]) self.test_add_monitor(monitor2, expected_release=True) monitor2.update_rssi([40, 10, -60, 10]) self.test_add_monitor(monitor2, expected_release=True) monitor2.update_rssi([-140, 10, -60, 10]) self.test_add_monitor(monitor2, expected_release=True) monitor2.update_rssi([-40, 10, 60, 10]) self.test_add_monitor(monitor2, expected_release=True) monitor2.update_rssi([-40, 10, -160, 10]) self.test_add_monitor(monitor2, expected_release=True) monitor2.update_rssi([-60, 10, -40, 10]) self.test_add_monitor(monitor2, expected_release=True) # Unset the rssi filter parameters. monitor2.update_rssi([127, 0, 127, 0]) # Incorrect pattern parameters, release should get called. monitor2.update_patterns([ [32, 0x09, 'MOUSE'], ]) self.test_add_monitor(monitor2, expected_release=True) monitor2.update_patterns([ [0, 0x00, 'MOUSE'], ]) self.test_add_monitor(monitor2, expected_release=True) monitor2.update_patterns([ [0, 0x40, 'MOUSE'], ]) self.test_add_monitor(monitor2, expected_release=True) monitor2.update_patterns([ [0, 0x09, '0123456789ABCDEF0123456789ABCDEF0'], ]) self.test_add_monitor(monitor2, expected_release=True) monitor2.update_patterns([ [32, 0x09, [0xc2, 0x03]], [0, 3, [0x12, 0x18]], ]) self.test_add_monitor(monitor2, expected_release=True) monitor2.update_patterns([ [0, 0x19, [0xc2, 0x03]], [0, 0x00, [0x12, 0x18]], ]) self.test_add_monitor(monitor2, expected_release=True) # Correct pattern parameters, activate should get called. monitor2.update_patterns([ [0, 0x09, 'MOUSE'], ]) self.test_add_monitor(monitor2, expected_activate=True) self.test_remove_monitor(monitor2) monitor2.update_rssi([-40, 10, -60, 10]) monitor2.update_patterns([ [0, 0x19, [0xc2, 0x03]], [0, 0x03, [0x12, 0x18]], ]) self.test_add_monitor(monitor2, expected_activate=True) self.test_remove_monitor(monitor2) # Unregister the app, should not fail. self.test_unregister_app(app1) # Terminate the test app instance. self.test_exit_app(app1) def advmon_test_pattern_filter_only(self): """Test case: PATTERN_FILTER_ONLY Verify matching of advertisements w.r.t. various pattern values and different AD Data Types - Local Name Service UUID and Device Type. Test working of patterns filter matching with multiple clients, multiple monitors and suspend/resume, without RSSI filtering. """ self.test_is_adv_monitoring_supported() self.test_setup_peer_devices() # Create two test app instances. app1 = self.create_app() app2 = self.create_app() # Register both apps, should not fail. self.test_register_app(app1) self.test_register_app(app2) # Add monitors in both apps. monitor1 = TestMonitor(app1) monitor1.update_type('or_patterns') monitor1.update_patterns([ [5, 0x09, '_REF'], ]) monitor1.update_rssi([127, 0, 127, 0]) monitor2 = TestMonitor(app1) monitor2.update_type('or_patterns') monitor2.update_patterns([ [0, 0x03, [0x12, 0x18]], ]) monitor2.update_rssi([127, 0, 127, 0]) monitor3 = TestMonitor(app2) monitor3.update_type('or_patterns') monitor3.update_patterns([ [0, 0x19, [0xc1, 0x03]], [0, 0x09, 'MOUSE'], ]) monitor3.update_rssi([127, 0, 127, 0]) monitor4 = TestMonitor(app2) monitor4.update_type('or_patterns') monitor4.update_patterns([ [0, 0x19, [0xc1, 0x03]], [0, 0x19, [0xc3, 0x03]], ]) monitor4.update_rssi([127, 0, 127, 0]) # Activate should get invoked. self.test_add_monitor(monitor1, expected_activate=True) self.test_add_monitor(monitor2, expected_activate=True) self.test_add_monitor(monitor3, expected_activate=True) self.test_add_monitor(monitor4, expected_activate=True) # DeviceFound for mouse should get triggered only for monitors # matching the adv pattern filter. self.test_start_peer_device_adv(self.peer_mouse, duration=5) self.test_device_found(monitor1, count=self.MULTIPLE_EVENTS) self.test_device_found(monitor2, count=self.MULTIPLE_EVENTS) self.test_device_found(monitor3, count=self.MULTIPLE_EVENTS) # Device type 0xc203 should not match. self.test_device_found(monitor4, count=0) self.test_stop_peer_device_adv(self.peer_mouse) # Initiate suspend/resume. self.suspend_resume() # Remove a monitor from one app, shouldn't affect working of other # monitors or apps. self.test_remove_monitor(monitor1) # Reset event counts before next test. self.test_reset_event_count(monitor2) self.test_reset_event_count(monitor3) # DeviceFound for mouse should get triggered again for monitors # matching the adv pattern filter. self.test_start_peer_device_adv(self.peer_mouse, duration=5) self.test_device_found(monitor2, count=self.MULTIPLE_EVENTS) self.test_device_found(monitor3, count=self.MULTIPLE_EVENTS) self.test_stop_peer_device_adv(self.peer_mouse) # Terminate an app, shouldn't affect working of monitors in other apps. self.test_exit_app(app1) # Reset event counts before next test. self.test_reset_event_count(monitor3) # DeviceFound should get triggered for keyboard. self.test_start_peer_device_adv(self.peer_keybd, duration=5) self.test_device_found(monitor3, count=self.MULTIPLE_EVENTS) self.test_device_found(monitor4, count=self.MULTIPLE_EVENTS) self.test_stop_peer_device_adv(self.peer_keybd) # Unregister the running app, should not fail. self.test_unregister_app(app2) # Terminate the running test app instance. self.test_exit_app(app2) def advmon_test_pattern_filter_1(self): """Test case: PATTERN_FILTER_1 Verify matching of advertisements w.r.t. various pattern values and different AD Data Types - Local Name Service UUID and Device Type. """ self.test_is_adv_monitoring_supported(require_rssi_filtering = True) self.test_setup_peer_devices() # Create a test app instance. app1 = self.create_app() monitor1 = TestMonitor(app1) monitor1.update_type('or_patterns') monitor1.update_rssi([-60, 3, -80, 3]) # Register the app, should not fail. self.test_register_app(app1) monitor1.update_patterns([ [5, 0x09, '_REF'], ]) self.test_add_monitor(monitor1, expected_activate=True) # Local name 'KEYBD_REF' should match. self.test_start_peer_device_adv(self.peer_keybd, duration=5) self.test_device_found(monitor1, count=1) # Local name 'MOUSE_REF' should match. self.test_start_peer_device_adv(self.peer_mouse, duration=5) self.test_device_found(monitor1, count=2) self.test_stop_peer_device_adv(self.peer_keybd) self.test_stop_peer_device_adv(self.peer_mouse) self.test_remove_monitor(monitor1) monitor1.update_patterns([ [0, 0x03, [0x12, 0x18]], ]) self.test_add_monitor(monitor1, expected_activate=True) # Service UUID 0x1812 should match. self.test_start_peer_device_adv(self.peer_keybd, duration=5) self.test_device_found(monitor1, count=1) # Service UUID 0x1812 should match. self.test_start_peer_device_adv(self.peer_mouse, duration=5) self.test_device_found(monitor1, count=2) self.test_stop_peer_device_adv(self.peer_keybd) self.test_stop_peer_device_adv(self.peer_mouse) self.test_remove_monitor(monitor1) monitor1.update_patterns([ [0, 0x19, [0xc1, 0x03]], [0, 0x09, 'MOUSE'], ]) self.test_add_monitor(monitor1, expected_activate=True) # Device type 0xc103 should match. self.test_start_peer_device_adv(self.peer_keybd, duration=5) self.test_device_found(monitor1, count=1) # Local name 'MOUSE_REF' should match. self.test_start_peer_device_adv(self.peer_mouse, duration=5) self.test_device_found(monitor1, count=2) self.test_stop_peer_device_adv(self.peer_keybd) self.test_stop_peer_device_adv(self.peer_mouse) self.test_remove_monitor(monitor1) monitor1.update_patterns([ [0, 0x19, [0xc1, 0x03]], [0, 0x19, [0xc3, 0x03]], ]) self.test_add_monitor(monitor1, expected_activate=True) # Device type 0xc103 should match. self.test_start_peer_device_adv(self.peer_keybd, duration=5) self.test_device_found(monitor1, count=1) # Device type 0xc203 should not match. self.test_start_peer_device_adv(self.peer_mouse, duration=5) self.test_device_found(monitor1, count=1) self.test_stop_peer_device_adv(self.peer_keybd) self.test_stop_peer_device_adv(self.peer_mouse) self.test_remove_monitor(monitor1) # Unregister the app, should not fail. self.test_unregister_app(app1) # Terminate the test app instance. self.test_exit_app(app1) def advmon_test_rssi_filter_1(self): """Test case: RSSI_FILTER_1 Verify unset RSSI filter and filter with no matching RSSI values. """ self.test_is_adv_monitoring_supported(require_rssi_filtering = True) self.test_setup_peer_devices() # Create a test app instance. app1 = self.create_app() monitor1 = TestMonitor(app1) monitor1.update_type('or_patterns') monitor1.update_patterns([ [0, 0x03, [0x12, 0x18]], ]) # Register the app, should not fail. self.test_register_app(app1) monitor1.update_rssi([127, 0, 127, 0]) self.test_add_monitor(monitor1, expected_activate=True) # Unset RSSI filter, adv should match multiple times. self.test_start_peer_device_adv(self.peer_keybd, duration=5) self.test_device_found(monitor1, count=self.MULTIPLE_EVENTS) # Unset RSSI filter, DeviceLost should not get triggered. self.test_stop_peer_device_adv(self.peer_keybd, duration=5) self.test_device_lost(monitor1, count=0) self.test_remove_monitor(monitor1) monitor1.update_rssi([-10, 5, -20, 5]) self.test_add_monitor(monitor1, expected_activate=True) # Adv RSSI lower than RSSI filter, DeviceFound should not get triggered. self.test_start_peer_device_adv(self.peer_keybd, duration=10) self.test_device_found(monitor1, count=0) # No device was found earlier, so DeviceLost should not get triggered. self.test_stop_peer_device_adv(self.peer_keybd, duration=10) self.test_device_lost(monitor1, count=0) self.test_remove_monitor(monitor1) # Unregister the app, should not fail. self.test_unregister_app(app1) # Terminate the test app instance. self.test_exit_app(app1) def advmon_test_rssi_filter_2(self): """Test case: RSSI_FILTER_2 Verify RSSI filter matching with multiple peer devices. """ self.test_is_adv_monitoring_supported(require_rssi_filtering = True) self.test_setup_peer_devices() # Create a test app instance. app1 = self.create_app() monitor1 = TestMonitor(app1) monitor1.update_type('or_patterns') monitor1.update_patterns([ [0, 0x03, [0x12, 0x18]], ]) # Register the app, should not fail. self.test_register_app(app1) monitor1.update_rssi([-60, 3, -80, 3]) self.test_add_monitor(monitor1, expected_activate=True) # DeviceFound should get triggered only once per device. self.test_start_peer_device_adv(self.peer_keybd, duration=10) self.test_device_found(monitor1, count=1) # DeviceFound should get triggered for another device. self.test_start_peer_device_adv(self.peer_mouse, duration=10) self.test_device_found(monitor1, count=2) # DeviceLost should get triggered only once per device. self.test_stop_peer_device_adv(self.peer_keybd, duration=10) self.test_device_lost(monitor1, count=1) # DeviceLost should get triggered for another device. self.test_stop_peer_device_adv(self.peer_mouse, duration=10) self.test_device_lost(monitor1, count=2) self.test_remove_monitor(monitor1) monitor1.update_rssi([-60, 10, -80, 10]) self.test_add_monitor(monitor1, expected_activate=True) # Device was online for short period of time, so DeviceFound should # not get triggered. self.test_start_peer_device_adv(self.peer_keybd, duration=5) self.test_device_found(monitor1, count=0) # Device did not come back online, DeviceFound should not get triggered. # No device was found earlier, so DeviceLost should not get triggered. self.test_stop_peer_device_adv(self.peer_keybd, duration=15) self.test_device_found(monitor1, count=0) self.test_device_lost(monitor1, count=0) self.test_remove_monitor(monitor1) # Unregister the app, should not fail. self.test_unregister_app(app1) # Terminate the test app instance. self.test_exit_app(app1) def advmon_test_rssi_filter_3(self): """Test case: RSSI_FILTER_3 Verify reset of RSSI timers based on advertisements. """ self.test_is_adv_monitoring_supported(require_rssi_filtering = True) self.test_setup_peer_devices() # Create a test app instance. app1 = self.create_app() monitor1 = TestMonitor(app1) monitor1.update_type('or_patterns') monitor1.update_patterns([ [0, 0x03, [0x12, 0x18]], ]) # Register the app, should not fail. self.test_register_app(app1) monitor1.update_rssi([-60, 10, -80, 10]) self.test_add_monitor(monitor1, expected_activate=True) # DeviceFound should not get triggered before timeout. self.test_start_peer_device_adv(self.peer_keybd, duration=5) self.test_device_found(monitor1, count=0) # DeviceFound should not get triggered as device went offline. # No device was found earlier, so DeviceLost should not get triggered. self.test_stop_peer_device_adv(self.peer_keybd, duration=10) self.test_device_found(monitor1, count=0) self.test_device_lost(monitor1, count=0) # Timer should get reset, so DeviceFound should not get triggered. self.test_start_peer_device_adv(self.peer_keybd, duration=5) self.test_device_found(monitor1, count=0) # DeviceFound should get triggered once timer completes. self.test_device_found(monitor1, count=1, delay=10) # DeviceLost should not get triggered before timeout. self.test_stop_peer_device_adv(self.peer_keybd, duration=5) self.test_device_lost(monitor1, count=0) # Timer should get reset, so DeviceLost should not get triggered. # DeviceFound should not get triggered as device is not lost yet. self.test_start_peer_device_adv(self.peer_keybd, duration=5) self.test_device_lost(monitor1, count=0) self.test_device_found(monitor1, count=1) # Timer should get reset, so DeviceLost should not get triggered. self.test_stop_peer_device_adv(self.peer_keybd, duration=5) self.test_device_lost(monitor1, count=0) # DeviceLost should get triggered once timer completes. self.test_device_lost(monitor1, count=1, delay=10) self.test_remove_monitor(monitor1) # Unregister the app, should not fail. self.test_unregister_app(app1) # Terminate the test app instance. self.test_exit_app(app1) def advmon_test_multi_client(self): """Test case: MULTI_CLIENT Verify working of patterns filter and RSSI filters with multiple clients and multiple monitors. """ self.test_is_adv_monitoring_supported(require_rssi_filtering = True) self.test_setup_peer_devices() # Create two test app instances. app1 = self.create_app() app2 = self.create_app() # Register both apps, should not fail. self.test_register_app(app1) self.test_register_app(app2) # Monitors with same pattern and RSSI filter values in both apps. monitor1 = TestMonitor(app1) monitor1.update_type('or_patterns') monitor1.update_patterns([ [0, 0x03, [0x12, 0x18]], [0, 0x19, [0xc1, 0x03]], ]) monitor1.update_rssi([-60, 3, -80, 3]) monitor2 = TestMonitor(app2) monitor2.update_type('or_patterns') monitor2.update_patterns([ [0, 0x03, [0x12, 0x18]], [0, 0x19, [0xc1, 0x03]], ]) monitor2.update_rssi([-60, 3, -80, 3]) # Activate should get invoked. self.test_add_monitor(monitor1, expected_activate=True) self.test_add_monitor(monitor2, expected_activate=True) # DeviceFound should get triggered for keyboard. self.test_start_peer_device_adv(self.peer_keybd, duration=5) self.test_device_found(monitor1, count=1) self.test_device_found(monitor2, count=1) self.test_stop_peer_device_adv(self.peer_keybd) # Remove a monitor from one app. self.test_remove_monitor(monitor1) # Monitors with same pattern but different RSSI filter values. monitor3 = TestMonitor(app1) monitor3.update_type('or_patterns') monitor3.update_patterns([ [0, 0x19, [0xc2, 0x03]], ]) monitor3.update_rssi([-60, 3, -80, 3]) monitor4 = TestMonitor(app2) monitor4.update_type('or_patterns') monitor4.update_patterns([ [0, 0x19, [0xc2, 0x03]], ]) monitor4.update_rssi([-60, 10, -80, 10]) # Activate should get invoked. self.test_add_monitor(monitor3, expected_activate=True) self.test_add_monitor(monitor4, expected_activate=True) # DeviceFound should get triggered for mouse. self.test_start_peer_device_adv(self.peer_mouse, duration=5) self.test_device_found(monitor2, count=2) self.test_device_found(monitor3, count=1) # Since the RSSI timeouts are different for monitor4, DeviceFound # event should get triggered after total of 10 seconds. self.test_device_found(monitor4, count=0) self.test_device_found(monitor4, count=1, delay=5) self.test_stop_peer_device_adv(self.peer_mouse) # Unregister both apps, should not fail. self.test_unregister_app(app1) self.test_unregister_app(app2) # Terminate the both test app instances. self.test_exit_app(app1) self.test_exit_app(app2) def advmon_test_fg_bg_combination(self): """Test case: FG_BG_COMBINATION Verify background scanning and foreground scanning do not interfere working of each other. """ self.test_is_adv_monitoring_supported() self.test_setup_peer_devices() # Create a test app instance. app1 = self.create_app() monitor1 = TestMonitor(app1) monitor1.update_type('or_patterns') monitor1.update_patterns([ [0, 0x03, [0x12, 0x18]], ]) monitor1.update_rssi([127, 0, 127, 0]) # Register the app, should not fail. self.test_register_app(app1) # Activate should get invoked. self.test_add_monitor(monitor1, expected_activate=True) # Pair/connect LE Mouse. self.test_start_peer_device_adv(self.peer_mouse, duration=5) time.sleep(self.PAIR_TEST_SLEEP_SECS) self.test_discover_device(self.peer_mouse.address) time.sleep(self.PAIR_TEST_SLEEP_SECS) self.test_pairing(self.peer_mouse.address, self.peer_mouse.pin) time.sleep(self.PAIR_TEST_SLEEP_SECS) self.test_connection_by_adapter(self.peer_mouse.address) self.test_connection_by_device(self.peer_mouse) # DeviceFound should get triggered for keyboard. self.test_reset_event_count(monitor1) self.test_start_peer_device_adv(self.peer_keybd, duration=5) self.test_device_found(monitor1, count=self.MULTIPLE_EVENTS) self.test_stop_peer_device_adv(self.peer_keybd) # Start foreground scanning. self.test_start_discovery() # Disconnect LE mouse. self.test_disconnection_by_device(self.peer_mouse) # Remove the monitor. self.test_remove_monitor(monitor1) # Activate should get invoked. self.test_add_monitor(monitor1, expected_activate=True) # Connect LE mouse. self.test_connection_by_device(self.peer_mouse) # DeviceFound should get triggered for keyboard. self.test_reset_event_count(monitor1) self.test_start_peer_device_adv(self.peer_keybd, duration=10) self.test_device_found(monitor1, count=self.MULTIPLE_EVENTS) self.test_stop_peer_device_adv(self.peer_keybd) # Stop foreground scanning. self.test_stop_discovery() # Disconnect LE mouse. self.test_disconnection_by_device(self.peer_mouse) # DeviceFound should get triggered for keyboard. self.test_reset_event_count(monitor1) self.test_start_peer_device_adv(self.peer_keybd, duration=5) self.test_device_found(monitor1, count=self.MULTIPLE_EVENTS) self.test_stop_peer_device_adv(self.peer_keybd) # Remove the monitor. self.test_remove_monitor(monitor1) # Connect LE mouse. self.test_connection_by_device(self.peer_mouse) # Unregister the app, should not fail. self.test_unregister_app(app1) # Terminate the test app instance. self.test_exit_app(app1) def advmon_test_suspend_resume(self): """Test case: SUSPEND_RESUME Verify working of background scanning with suspend/resume. """ self.test_is_adv_monitoring_supported(require_rssi_filtering = True) self.test_setup_peer_devices() # Create two test app instances. app1 = self.create_app() app2 = self.create_app() # Register both apps, should not fail. self.test_register_app(app1) self.test_register_app(app2) # Add monitors in both apps. monitor1 = TestMonitor(app1) monitor1.update_type('or_patterns') monitor1.update_patterns([ [0, 0x03, [0x12, 0x18]], ]) monitor1.update_rssi([-60, 3, -80, 3]) monitor2 = TestMonitor(app1) monitor2.update_type('or_patterns') monitor2.update_patterns([ [0, 0x19, [0xc2, 0x03]], ]) monitor2.update_rssi([-60, 10, -80, 10]) monitor3 = TestMonitor(app2) monitor3.update_type('or_patterns') monitor3.update_patterns([ [0, 0x03, [0x12, 0x18]], ]) monitor3.update_rssi([-60, 3, -80, 3]) monitor4 = TestMonitor(app2) monitor4.update_type('or_patterns') monitor4.update_patterns([ [0, 0x19, [0xc2, 0x03]], ]) monitor4.update_rssi([-60, 15, -80, 15]) # Activate should get invoked. self.test_add_monitor(monitor1, expected_activate=True) self.test_add_monitor(monitor2, expected_activate=True) self.test_add_monitor(monitor3, expected_activate=True) self.test_add_monitor(monitor4, expected_activate=True) # DeviceFound for mouse should get triggered only for monitors # satisfying the RSSI timers. self.test_start_peer_device_adv(self.peer_mouse, duration=5) self.test_device_found(monitor1, count=1) self.test_device_found(monitor2, count=0) self.test_device_found(monitor3, count=1) self.test_device_found(monitor4, count=0) # Initiate suspend/resume. self.suspend_resume() # Remove a monitor from one app, shouldn't affect working of other # monitors or apps. self.test_remove_monitor(monitor1) # DeviceFound should get triggered for monitors with higher RSSI timers. self.test_device_found(monitor2, count=1, delay=10) self.test_device_found(monitor4, count=1, delay=5) self.test_stop_peer_device_adv(self.peer_mouse) # Terminate an app, shouldn't affect working of monitors in other apps. self.test_exit_app(app1) # DeviceFound should get triggered for keyboard. self.test_start_peer_device_adv(self.peer_keybd, duration=5) self.test_device_found(monitor3, count=2) self.test_stop_peer_device_adv(self.peer_keybd) # Unregister the running app, should not fail. self.test_unregister_app(app2) # Terminate the running test app instance. self.test_exit_app(app2) def advmon_test_interleaved_scan(self): """ Test cases for verifying interleave scan """ self.test_is_adv_monitoring_supported() # cycles to collect logs for tests expect no interleave scan EXPECT_FALSE_TEST_CYCLE = 3 # Create a test app instance. app1 = self.create_app() monitor1 = TestMonitor(app1) monitor1.update_type('or_patterns') monitor1.update_patterns([ [0, 0x03, [0x12, 0x18]], ]) monitor1.update_rssi([127, 0, 127, 0]) # Register the app, should not fail. self.test_register_app(app1) # Activate should get invoked. self.test_add_monitor(monitor1, expected_activate=True) # No device in allowlist, interleave with idle self.test_interleaving_state(False, cycles=EXPECT_FALSE_TEST_CYCLE) # No device in allowlist, interleave with idle, interrupted by active # scan self.test_start_discovery() self.test_interleaving_state(False, cycles=EXPECT_FALSE_TEST_CYCLE) self.test_stop_discovery() self.test_interleaving_state(False, cycles=EXPECT_FALSE_TEST_CYCLE) # No device in allowlist, interleave with idle, interrupted by suspend # resume self.suspend_resume() self.test_interleaving_state(False, cycles=EXPECT_FALSE_TEST_CYCLE) # Pair/connect LE Mouse. device = self.devices['BLE_MOUSE'][0] self.test_discover_device(device.address) time.sleep(self.PAIR_TEST_SLEEP_SECS) self.test_pairing(device.address, device.pin, trusted=True) time.sleep(self.PAIR_TEST_SLEEP_SECS) # BLE_MOUSE in allowlist, interleave with allowlist passive scan self.test_interleaving_state(False, cycles=EXPECT_FALSE_TEST_CYCLE) device.AdapterPowerOff() # Make sure the peer is disconnected self.test_device_is_not_connected(device.address) self.test_interleaving_state(True) # Interleaving with allowlist should get paused during active scan self.test_interleaving_active_scan_cycle() # Interleaving with allowlist should get resumed after stopping scan self.test_interleaving_state(True) # Interleaving with allowlist should get paused during system suspend, # get resumed after system awake self.test_interleaving_suspend_resume() self.test_interleaving_state(True) self.test_remove_monitor(monitor1) self.test_interleaving_state(False, cycles=EXPECT_FALSE_TEST_CYCLE) # Unregister the app, should not fail. self.test_unregister_app(app1) # Terminate the test app instance. self.test_exit_app(app1)