1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18from bumble.device import Device 19from bumble.hci import HCI_Reset_Command 20 21 22# ----------------------------------------------------------------------------- 23class Scanner: 24 class ScanEntry: 25 def __init__(self, advertisement): 26 self.address = advertisement.address.to_string(False) 27 self.address_type = ( 28 'Public', 29 'Random', 30 'Public Identity', 31 'Random Identity', 32 )[advertisement.address.address_type] 33 self.rssi = advertisement.rssi 34 self.data = advertisement.data.to_string('\n') 35 36 def __init__(self, hci_source, hci_sink): 37 super().__init__() 38 self.device = Device.with_hci( 39 'Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink 40 ) 41 self.scan_entries = {} 42 self.listeners = {} 43 self.device.on('advertisement', self.on_advertisement) 44 45 async def start(self): 46 print('### Starting Scanner') 47 self.scan_entries = {} 48 self.emit_update() 49 await self.device.power_on() 50 await self.device.start_scanning() 51 print('### Scanner started') 52 53 async def stop(self): 54 # TODO: replace this once a proper reset is implemented in the lib. 55 await self.device.host.send_command(HCI_Reset_Command()) 56 await self.device.power_off() 57 print('### Scanner stopped') 58 59 def emit_update(self): 60 if listener := self.listeners.get('update'): 61 listener(list(self.scan_entries.values())) 62 63 def on(self, event_name, listener): 64 self.listeners[event_name] = listener 65 66 def on_advertisement(self, advertisement): 67 self.scan_entries[advertisement.address] = self.ScanEntry(advertisement) 68 self.emit_update() 69 70 71# ----------------------------------------------------------------------------- 72def main(hci_source, hci_sink): 73 return Scanner(hci_source, hci_sink) 74