• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import os
20import logging
21import click
22
23from bumble.colors import color
24from bumble.device import Device
25from bumble.transport import open_transport_or_link
26from bumble.keys import JsonKeyStore
27from bumble.smp import AddressResolver
28from bumble.device import Advertisement
29from bumble.hci import HCI_Constant, HCI_LE_1M_PHY, HCI_LE_CODED_PHY
30
31
32# -----------------------------------------------------------------------------
33def make_rssi_bar(rssi):
34    DISPLAY_MIN_RSSI = -105
35    DISPLAY_MAX_RSSI = -30
36    DEFAULT_RSSI_BAR_WIDTH = 30
37
38    blocks = ['', '▏', '▎', '▍', '▌', '▋', '▊', '▉']
39    bar_width = (rssi - DISPLAY_MIN_RSSI) / (DISPLAY_MAX_RSSI - DISPLAY_MIN_RSSI)
40    bar_width = min(max(bar_width, 0), 1)
41    bar_ticks = int(bar_width * DEFAULT_RSSI_BAR_WIDTH * 8)
42    return ('█' * int(bar_ticks / 8)) + blocks[bar_ticks % 8]
43
44
45# -----------------------------------------------------------------------------
46class AdvertisementPrinter:
47    def __init__(self, min_rssi, resolver):
48        self.min_rssi = min_rssi
49        self.resolver = resolver
50
51    def print_advertisement(self, advertisement):
52        address = advertisement.address
53        address_color = 'yellow' if advertisement.is_connectable else 'red'
54
55        if self.min_rssi is not None and advertisement.rssi < self.min_rssi:
56            return
57
58        address_qualifier = ''
59        resolution_qualifier = ''
60        if self.resolver and advertisement.address.is_resolvable:
61            resolved = self.resolver.resolve(advertisement.address)
62            if resolved is not None:
63                resolution_qualifier = f'(resolved from {advertisement.address})'
64                address = resolved
65
66        address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[
67            address.address_type
68        ]
69        if address.is_public:
70            type_color = 'cyan'
71        else:
72            if address.is_static:
73                type_color = 'green'
74                address_qualifier = '(static)'
75            elif address.is_resolvable:
76                type_color = 'magenta'
77                address_qualifier = '(resolvable)'
78            else:
79                type_color = 'blue'
80                address_qualifier = '(non-resolvable)'
81
82        separator = '\n  '
83        rssi_bar = make_rssi_bar(advertisement.rssi)
84        if not advertisement.is_legacy:
85            phy_info = (
86                f'PHY: {HCI_Constant.le_phy_name(advertisement.primary_phy)}/'
87                f'{HCI_Constant.le_phy_name(advertisement.secondary_phy)} '
88                f'{separator}'
89            )
90        else:
91            phy_info = ''
92
93        print(
94            f'>>> {color(address, address_color)} '
95            f'[{color(address_type_string, type_color)}]{address_qualifier}'
96            f'{resolution_qualifier}:{separator}'
97            f'{phy_info}'
98            f'RSSI:{advertisement.rssi:4} {rssi_bar}{separator}'
99            f'{advertisement.data.to_string(separator)}\n'
100        )
101
102    def on_advertisement(self, advertisement):
103        self.print_advertisement(advertisement)
104
105    def on_advertising_report(self, report):
106        print(f'{color("EVENT", "green")}: {report.event_type_string()}')
107        self.print_advertisement(Advertisement.from_advertising_report(report))
108
109
110# -----------------------------------------------------------------------------
111async def scan(
112    min_rssi,
113    passive,
114    scan_interval,
115    scan_window,
116    phy,
117    filter_duplicates,
118    raw,
119    keystore_file,
120    device_config,
121    transport,
122):
123    print('<<< connecting to HCI...')
124    async with await open_transport_or_link(transport) as (hci_source, hci_sink):
125        print('<<< connected')
126
127        if device_config:
128            device = Device.from_config_file_with_hci(
129                device_config, hci_source, hci_sink
130            )
131        else:
132            device = Device.with_hci(
133                'Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink
134            )
135
136        if keystore_file:
137            keystore = JsonKeyStore(namespace=None, filename=keystore_file)
138            device.keystore = keystore
139        else:
140            resolver = None
141
142        if device.keystore:
143            resolving_keys = await device.keystore.get_resolving_keys()
144            resolver = AddressResolver(resolving_keys)
145
146        printer = AdvertisementPrinter(min_rssi, resolver)
147        if raw:
148            device.host.on('advertising_report', printer.on_advertising_report)
149        else:
150            device.on('advertisement', printer.on_advertisement)
151
152        await device.power_on()
153
154        if phy is None:
155            scanning_phys = [HCI_LE_1M_PHY, HCI_LE_CODED_PHY]
156        else:
157            scanning_phys = [{'1m': HCI_LE_1M_PHY, 'coded': HCI_LE_CODED_PHY}[phy]]
158
159        await device.start_scanning(
160            active=(not passive),
161            scan_interval=scan_interval,
162            scan_window=scan_window,
163            filter_duplicates=filter_duplicates,
164            scanning_phys=scanning_phys,
165        )
166
167        await hci_source.wait_for_termination()
168
169
170# -----------------------------------------------------------------------------
171@click.command()
172@click.option('--min-rssi', type=int, help='Minimum RSSI value')
173@click.option('--passive', is_flag=True, default=False, help='Perform passive scanning')
174@click.option('--scan-interval', type=int, default=60, help='Scan interval')
175@click.option('--scan-window', type=int, default=60, help='Scan window')
176@click.option(
177    '--phy', type=click.Choice(['1m', 'coded']), help='Only scan on the specified PHY'
178)
179@click.option(
180    '--filter-duplicates',
181    type=bool,
182    default=True,
183    help='Filter duplicates at the controller level',
184)
185@click.option(
186    '--raw',
187    is_flag=True,
188    default=False,
189    help='Listen for raw advertising reports instead of processed ones',
190)
191@click.option('--keystore-file', help='Keystore file to use when resolving addresses')
192@click.option('--device-config', help='Device config file for the scanning device')
193@click.argument('transport')
194def main(
195    min_rssi,
196    passive,
197    scan_interval,
198    scan_window,
199    phy,
200    filter_duplicates,
201    raw,
202    keystore_file,
203    device_config,
204    transport,
205):
206    logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
207    asyncio.run(
208        scan(
209            min_rssi,
210            passive,
211            scan_interval,
212            scan_window,
213            phy,
214            filter_duplicates,
215            raw,
216            keystore_file,
217            device_config,
218            transport,
219        )
220    )
221
222
223# -----------------------------------------------------------------------------
224if __name__ == '__main__':
225    main()
226