• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import os
20import logging
21import click
22from colors import color
23
24from bumble.device import Device
25from bumble.transport import open_transport_or_link
26from bumble.keys import JsonKeyStore
27from bumble.smp import AddressResolver
28from bumble.hci import HCI_LE_Advertising_Report_Event
29from bumble.core import AdvertisingData
30
31
32# -----------------------------------------------------------------------------
33def make_rssi_bar(rssi):
34    DISPLAY_MIN_RSSI       = -105
35    DISPLAY_MAX_RSSI       = -30
36    DEFAULT_RSSI_BAR_WIDTH = 30
37
38    blocks = ['', '▏', '▎', '▍', '▌', '▋', '▊', '▉']
39    bar_width = (rssi - DISPLAY_MIN_RSSI) / (DISPLAY_MAX_RSSI - DISPLAY_MIN_RSSI)
40    bar_width = min(max(bar_width, 0), 1)
41    bar_ticks = int(bar_width * DEFAULT_RSSI_BAR_WIDTH * 8)
42    return ('█' * int(bar_ticks / 8)) + blocks[bar_ticks % 8]
43
44
45# -----------------------------------------------------------------------------
46class AdvertisementPrinter:
47    def __init__(self, min_rssi, resolver):
48        self.min_rssi = min_rssi
49        self.resolver = resolver
50
51    def print_advertisement(self, address, address_color, ad_data, rssi):
52        if self.min_rssi is not None and rssi < self.min_rssi:
53            return
54
55        address_qualifier = ''
56        resolution_qualifier = ''
57        if self.resolver and address.is_resolvable:
58            resolved = self.resolver.resolve(address)
59            if resolved is not None:
60                resolution_qualifier = f'(resolved from {address})'
61                address = resolved
62
63        address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[address.address_type]
64        if address.is_public:
65            type_color = 'cyan'
66        else:
67            if address.is_static:
68                type_color = 'green'
69                address_qualifier = '(static)'
70            elif address.is_resolvable:
71                type_color = 'magenta'
72                address_qualifier = '(resolvable)'
73            else:
74                type_color = 'blue'
75                address_qualifier = '(non-resolvable)'
76
77        rssi_bar = make_rssi_bar(rssi)
78        separator = '\n  '
79        print(f'>>> {color(address, address_color)} [{color(address_type_string, type_color)}]{address_qualifier}{resolution_qualifier}:{separator}RSSI:{rssi:4} {rssi_bar}{separator}{ad_data.to_string(separator)}\n')
80
81    def on_advertisement(self, address, ad_data, rssi, connectable):
82        address_color = 'yellow' if connectable else 'red'
83        self.print_advertisement(address, address_color, ad_data, rssi)
84
85    def on_advertising_report(self, address, ad_data, rssi, event_type):
86        print(f'{color("EVENT", "green")}: {HCI_LE_Advertising_Report_Event.event_type_name(event_type)}')
87        ad_data = AdvertisingData.from_bytes(ad_data)
88        self.print_advertisement(address, 'yellow', ad_data, rssi)
89
90
91# -----------------------------------------------------------------------------
92async def scan(
93    min_rssi,
94    passive,
95    scan_interval,
96    scan_window,
97    filter_duplicates,
98    raw,
99    keystore_file,
100    device_config,
101    transport
102):
103    print('<<< connecting to HCI...')
104    async with await open_transport_or_link(transport) as (hci_source, hci_sink):
105        print('<<< connected')
106
107        if device_config:
108            device = Device.from_config_file_with_hci(device_config, hci_source, hci_sink)
109        else:
110            device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
111
112        if keystore_file:
113            keystore = JsonKeyStore(namespace=None, filename=keystore_file)
114            device.keystore = keystore
115        else:
116            resolver = None
117
118        if device.keystore:
119            resolving_keys = await device.keystore.get_resolving_keys()
120            resolver = AddressResolver(resolving_keys)
121
122        printer = AdvertisementPrinter(min_rssi, resolver)
123        if raw:
124            device.host.on('advertising_report', printer.on_advertising_report)
125        else:
126            device.on('advertisement', printer.on_advertisement)
127
128        await device.power_on()
129        await device.start_scanning(
130            active=(not passive),
131            scan_interval=scan_interval,
132            scan_window=scan_window,
133            filter_duplicates=filter_duplicates
134        )
135
136        await hci_source.wait_for_termination()
137
138
139# -----------------------------------------------------------------------------
140@click.command()
141@click.option('--min-rssi', type=int, help='Minimum RSSI value')
142@click.option('--passive', is_flag=True, default=False, help='Perform passive scanning')
143@click.option('--scan-interval', type=int, default=60, help='Scan interval')
144@click.option('--scan-window', type=int, default=60, help='Scan window')
145@click.option('--filter-duplicates', type=bool, default=True, help='Filter duplicates at the controller level')
146@click.option('--raw', is_flag=True, default=False, help='Listen for raw advertising reports instead of processed ones')
147@click.option('--keystore-file', help='Keystore file to use when resolving addresses')
148@click.option('--device-config', help='Device config file for the scanning device')
149@click.argument('transport')
150def main(min_rssi, passive, scan_interval, scan_window, filter_duplicates, raw, keystore_file, device_config, transport):
151    logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
152    asyncio.run(scan(min_rssi, passive, scan_interval, scan_window, filter_duplicates, raw, keystore_file, device_config, transport))
153
154
155# -----------------------------------------------------------------------------
156if __name__ == '__main__':
157    main()
158