• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18from bumble.device import Device
19from bumble.hci import HCI_Reset_Command
20
21
22# -----------------------------------------------------------------------------
23class Scanner:
24    class ScanEntry:
25        def __init__(self, advertisement):
26            self.address = advertisement.address.to_string(False)
27            self.address_type = (
28                'Public',
29                'Random',
30                'Public Identity',
31                'Random Identity',
32            )[advertisement.address.address_type]
33            self.rssi = advertisement.rssi
34            self.data = advertisement.data.to_string('\n')
35
36    def __init__(self, hci_source, hci_sink):
37        super().__init__()
38        self.device = Device.with_hci(
39            'Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink
40        )
41        self.scan_entries = {}
42        self.listeners = {}
43        self.device.on('advertisement', self.on_advertisement)
44
45    async def start(self):
46        print('### Starting Scanner')
47        self.scan_entries = {}
48        self.emit_update()
49        await self.device.power_on()
50        await self.device.start_scanning()
51        print('### Scanner started')
52
53    async def stop(self):
54        # TODO: replace this once a proper reset is implemented in the lib.
55        await self.device.host.send_command(HCI_Reset_Command())
56        await self.device.power_off()
57        print('### Scanner stopped')
58
59    def emit_update(self):
60        if listener := self.listeners.get('update'):
61            listener(list(self.scan_entries.values()))
62
63    def on(self, event_name, listener):
64        self.listeners[event_name] = listener
65
66    def on_advertisement(self, advertisement):
67        self.scan_entries[advertisement.address] = self.ScanEntry(advertisement)
68        self.emit_update()
69
70
71# -----------------------------------------------------------------------------
72def main(hci_source, hci_sink):
73    return Scanner(hci_source, hci_sink)
74