• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import sys
20import os
21import logging
22import struct
23import json
24import websockets
25from bumble.colors import color
26
27from bumble.core import AdvertisingData
28from bumble.device import Device, Connection, Peer
29from bumble.utils import AsyncRunner
30from bumble.transport import open_transport_or_link
31from bumble.gatt import (
32    Descriptor,
33    Service,
34    Characteristic,
35    CharacteristicValue,
36    GATT_DEVICE_INFORMATION_SERVICE,
37    GATT_HUMAN_INTERFACE_DEVICE_SERVICE,
38    GATT_BATTERY_SERVICE,
39    GATT_BATTERY_LEVEL_CHARACTERISTIC,
40    GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
41    GATT_REPORT_CHARACTERISTIC,
42    GATT_REPORT_MAP_CHARACTERISTIC,
43    GATT_PROTOCOL_MODE_CHARACTERISTIC,
44    GATT_HID_INFORMATION_CHARACTERISTIC,
45    GATT_HID_CONTROL_POINT_CHARACTERISTIC,
46    GATT_REPORT_REFERENCE_DESCRIPTOR,
47)
48
49# -----------------------------------------------------------------------------
50
51# Protocol Modes
52HID_BOOT_PROTOCOL = 0x00
53HID_REPORT_PROTOCOL = 0x01
54
55# Report Types
56HID_INPUT_REPORT = 0x01
57HID_OUTPUT_REPORT = 0x02
58HID_FEATURE_REPORT = 0x03
59
60# Report Map
61HID_KEYBOARD_REPORT_MAP = bytes(
62    # pylint: disable=line-too-long
63    [
64        0x05,
65        0x01,  # Usage Page (Generic Desktop Controls)
66        0x09,
67        0x06,  # Usage (Keyboard)
68        0xA1,
69        0x01,  # Collection (Application)
70        0x85,
71        0x01,  # . Report ID (1)
72        0x05,
73        0x07,  # . Usage Page (Keyboard/Keypad)
74        0x19,
75        0xE0,  # . Usage Minimum (0xE0)
76        0x29,
77        0xE7,  # . Usage Maximum (0xE7)
78        0x15,
79        0x00,  # . Logical Minimum (0)
80        0x25,
81        0x01,  # . Logical Maximum (1)
82        0x75,
83        0x01,  # . Report Size (1)
84        0x95,
85        0x08,  # . Report Count (8)
86        0x81,
87        0x02,  # . Input (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position)
88        0x95,
89        0x01,  # . Report Count (1)
90        0x75,
91        0x08,  # . Report Size (8)
92        0x81,
93        0x01,  # . Input (Const,Array,Abs,No Wrap,Linear,Preferred State,No Null Position)
94        0x95,
95        0x06,  # . Report Count (6)
96        0x75,
97        0x08,  # . Report Size (8)
98        0x15,
99        0x00,  # . Logical Minimum (0x00)
100        0x25,
101        0x94,  # . Logical Maximum (0x94)
102        0x05,
103        0x07,  # . Usage Page (Keyboard/Keypad)
104        0x19,
105        0x00,  # . Usage Minimum (0x00)
106        0x29,
107        0x94,  # . Usage Maximum (0x94)
108        0x81,
109        0x00,  # . Input (Data,Array,Abs,No Wrap,Linear,Preferred State,No Null Position)
110        0x95,
111        0x05,  # . Report Count (5)
112        0x75,
113        0x01,  # . Report Size (1)
114        0x05,
115        0x08,  # . Usage Page (LEDs)
116        0x19,
117        0x01,  # . Usage Minimum (Num Lock)
118        0x29,
119        0x05,  # . Usage Maximum (Kana)
120        0x91,
121        0x02,  # . Output (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile)
122        0x95,
123        0x01,  # . Report Count (1)
124        0x75,
125        0x03,  # . Report Size (3)
126        0x91,
127        0x01,  # . Output (Const,Array,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile)
128        0xC0,  # End Collection
129    ]
130)
131
132
133# -----------------------------------------------------------------------------
134# pylint: disable=invalid-overridden-method
135class ServerListener(Device.Listener, Connection.Listener):
136    def __init__(self, device):
137        self.device = device
138
139    @AsyncRunner.run_in_task()
140    async def on_connection(self, connection):
141        print(f'=== Connected to {connection}')
142        connection.listener = self
143
144    @AsyncRunner.run_in_task()
145    async def on_disconnection(self, reason):
146        print(f'### Disconnected, reason={reason}')
147
148
149# -----------------------------------------------------------------------------
150def on_hid_control_point_write(_connection, value):
151    print(f'Control Point Write: {value}')
152
153
154# -----------------------------------------------------------------------------
155def on_report(characteristic, value):
156    print(color('Report:', 'cyan'), value.hex(), 'from', characteristic)
157
158
159# -----------------------------------------------------------------------------
160async def keyboard_host(device, peer_address):
161    await device.power_on()
162    connection = await device.connect(peer_address)
163    await connection.pair()
164    peer = Peer(connection)
165    await peer.discover_service(GATT_HUMAN_INTERFACE_DEVICE_SERVICE)
166    hid_services = peer.get_services_by_uuid(GATT_HUMAN_INTERFACE_DEVICE_SERVICE)
167    if not hid_services:
168        print(color('!!! No HID service', 'red'))
169        return
170    await peer.discover_characteristics()
171
172    protocol_mode_characteristics = peer.get_characteristics_by_uuid(
173        GATT_PROTOCOL_MODE_CHARACTERISTIC
174    )
175    if not protocol_mode_characteristics:
176        print(color('!!! No Protocol Mode characteristic', 'red'))
177        return
178    protocol_mode_characteristic = protocol_mode_characteristics[0]
179
180    hid_information_characteristics = peer.get_characteristics_by_uuid(
181        GATT_HID_INFORMATION_CHARACTERISTIC
182    )
183    if not hid_information_characteristics:
184        print(color('!!! No HID Information characteristic', 'red'))
185        return
186    hid_information_characteristic = hid_information_characteristics[0]
187
188    report_map_characteristics = peer.get_characteristics_by_uuid(
189        GATT_REPORT_MAP_CHARACTERISTIC
190    )
191    if not report_map_characteristics:
192        print(color('!!! No Report Map characteristic', 'red'))
193        return
194    report_map_characteristic = report_map_characteristics[0]
195
196    control_point_characteristics = peer.get_characteristics_by_uuid(
197        GATT_HID_CONTROL_POINT_CHARACTERISTIC
198    )
199    if not control_point_characteristics:
200        print(color('!!! No Control Point characteristic', 'red'))
201        return
202    # control_point_characteristic = control_point_characteristics[0]
203
204    report_characteristics = peer.get_characteristics_by_uuid(
205        GATT_REPORT_CHARACTERISTIC
206    )
207    if not report_characteristics:
208        print(color('!!! No Report characteristic', 'red'))
209        return
210    for i, characteristic in enumerate(report_characteristics):
211        print(color('REPORT:', 'yellow'), characteristic)
212        if characteristic.properties & Characteristic.Properties.NOTIFY:
213            await peer.discover_descriptors(characteristic)
214            report_reference_descriptor = characteristic.get_descriptor(
215                GATT_REPORT_REFERENCE_DESCRIPTOR
216            )
217            if report_reference_descriptor:
218                report_reference = await peer.read_value(report_reference_descriptor)
219                print(color('  Report Reference:', 'blue'), report_reference.hex())
220            else:
221                report_reference = bytes([0, 0])
222            await peer.subscribe(
223                characteristic,
224                lambda value, param=f'[{i}] {report_reference.hex()}': on_report(
225                    param, value
226                ),
227            )
228
229    protocol_mode = await peer.read_value(protocol_mode_characteristic)
230    print(f'Protocol Mode: {protocol_mode.hex()}')
231    hid_information = await peer.read_value(hid_information_characteristic)
232    print(f'HID Information: {hid_information.hex()}')
233    report_map = await peer.read_value(report_map_characteristic)
234    print(f'Report Map: {report_map.hex()}')
235
236    await asyncio.get_running_loop().create_future()
237
238
239# -----------------------------------------------------------------------------
240async def keyboard_device(device, command):
241    # Create an 'input report' characteristic to send keyboard reports to the host
242    input_report_characteristic = Characteristic(
243        GATT_REPORT_CHARACTERISTIC,
244        Characteristic.Properties.READ
245        | Characteristic.Properties.WRITE
246        | Characteristic.Properties.NOTIFY,
247        Characteristic.READABLE | Characteristic.WRITEABLE,
248        bytes([0, 0, 0, 0, 0, 0, 0, 0]),
249        [
250            Descriptor(
251                GATT_REPORT_REFERENCE_DESCRIPTOR,
252                Descriptor.READABLE,
253                bytes([0x01, HID_INPUT_REPORT]),
254            )
255        ],
256    )
257
258    # Create an 'output report' characteristic to receive keyboard reports from the host
259    output_report_characteristic = Characteristic(
260        GATT_REPORT_CHARACTERISTIC,
261        Characteristic.Properties.READ
262        | Characteristic.Properties.WRITE
263        | Characteristic.WRITE_WITHOUT_RESPONSE,
264        Characteristic.READABLE | Characteristic.WRITEABLE,
265        bytes([0]),
266        [
267            Descriptor(
268                GATT_REPORT_REFERENCE_DESCRIPTOR,
269                Descriptor.READABLE,
270                bytes([0x01, HID_OUTPUT_REPORT]),
271            )
272        ],
273    )
274
275    # Add the services to the GATT sever
276    device.add_services(
277        [
278            Service(
279                GATT_DEVICE_INFORMATION_SERVICE,
280                [
281                    Characteristic(
282                        GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
283                        Characteristic.Properties.READ,
284                        Characteristic.READABLE,
285                        'Bumble',
286                    )
287                ],
288            ),
289            Service(
290                GATT_HUMAN_INTERFACE_DEVICE_SERVICE,
291                [
292                    Characteristic(
293                        GATT_PROTOCOL_MODE_CHARACTERISTIC,
294                        Characteristic.Properties.READ,
295                        Characteristic.READABLE,
296                        bytes([HID_REPORT_PROTOCOL]),
297                    ),
298                    Characteristic(
299                        GATT_HID_INFORMATION_CHARACTERISTIC,
300                        Characteristic.Properties.READ,
301                        Characteristic.READABLE,
302                        # bcdHID=1.1, bCountryCode=0x00,
303                        # Flags=RemoteWake|NormallyConnectable
304                        bytes([0x11, 0x01, 0x00, 0x03]),
305                    ),
306                    Characteristic(
307                        GATT_HID_CONTROL_POINT_CHARACTERISTIC,
308                        Characteristic.WRITE_WITHOUT_RESPONSE,
309                        Characteristic.WRITEABLE,
310                        CharacteristicValue(write=on_hid_control_point_write),
311                    ),
312                    Characteristic(
313                        GATT_REPORT_MAP_CHARACTERISTIC,
314                        Characteristic.Properties.READ,
315                        Characteristic.READABLE,
316                        HID_KEYBOARD_REPORT_MAP,
317                    ),
318                    input_report_characteristic,
319                    output_report_characteristic,
320                ],
321            ),
322            Service(
323                GATT_BATTERY_SERVICE,
324                [
325                    Characteristic(
326                        GATT_BATTERY_LEVEL_CHARACTERISTIC,
327                        Characteristic.Properties.READ,
328                        Characteristic.READABLE,
329                        bytes([100]),
330                    )
331                ],
332            ),
333        ]
334    )
335
336    # Debug print
337    for attribute in device.gatt_server.attributes:
338        print(attribute)
339
340    # Set the advertising data
341    device.advertising_data = bytes(
342        AdvertisingData(
343            [
344                (
345                    AdvertisingData.COMPLETE_LOCAL_NAME,
346                    bytes('Bumble Keyboard', 'utf-8'),
347                ),
348                (
349                    AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
350                    bytes(GATT_HUMAN_INTERFACE_DEVICE_SERVICE),
351                ),
352                (AdvertisingData.APPEARANCE, struct.pack('<H', 0x03C1)),
353                (AdvertisingData.FLAGS, bytes([0x05])),
354            ]
355        )
356    )
357
358    # Attach a listener
359    device.listener = ServerListener(device)
360
361    # Go!
362    await device.power_on()
363    await device.start_advertising(auto_restart=True)
364
365    if command == 'web':
366        # Start a Websocket server to receive events from a web page
367        async def serve(websocket, _path):
368            while True:
369                try:
370                    message = await websocket.recv()
371                    print('Received: ', str(message))
372
373                    parsed = json.loads(message)
374                    message_type = parsed['type']
375                    if message_type == 'keydown':
376                        # Only deal with keys a to z for now
377                        key = parsed['key']
378                        if len(key) == 1:
379                            code = ord(key)
380                            if ord('a') <= code <= ord('z'):
381                                hid_code = 0x04 + code - ord('a')
382                                input_report_characteristic.value = bytes(
383                                    [0, 0, hid_code, 0, 0, 0, 0, 0]
384                                )
385                                await device.notify_subscribers(
386                                    input_report_characteristic
387                                )
388                    elif message_type == 'keyup':
389                        input_report_characteristic.value = bytes.fromhex(
390                            '0000000000000000'
391                        )
392                        await device.notify_subscribers(input_report_characteristic)
393
394                except websockets.exceptions.ConnectionClosedOK:
395                    pass
396
397        # pylint: disable-next=no-member
398        await websockets.serve(serve, 'localhost', 8989)
399        await asyncio.get_event_loop().create_future()
400    else:
401        message = bytes('hello', 'ascii')
402        while True:
403            for letter in message:
404                await asyncio.sleep(3.0)
405
406                # Keypress for the letter
407                keycode = 0x04 + letter - 0x61
408                input_report_characteristic.value = bytes(
409                    [0, 0, keycode, 0, 0, 0, 0, 0]
410                )
411                await device.notify_subscribers(input_report_characteristic)
412
413                # Key release
414                input_report_characteristic.value = bytes.fromhex('0000000000000000')
415                await device.notify_subscribers(input_report_characteristic)
416
417
418# -----------------------------------------------------------------------------
419async def main() -> None:
420    if len(sys.argv) < 4:
421        print(
422            'Usage: python keyboard.py <device-config> <transport-spec> <command>'
423            '  where <command> is one of:\n'
424            '  connect <address> (run a keyboard host, connecting to a keyboard)\n'
425            '  web (run a keyboard with keypress input from a web page, '
426            'see keyboard.html\n'
427        )
428        print(
429            '  sim (run a keyboard simulation, emitting a canned sequence of keystrokes'
430        )
431        print('example: python keyboard.py keyboard.json usb:0 sim')
432        print(
433            'example: python keyboard.py keyboard.json usb:0 connect A0:A1:A2:A3:A4:A5'
434        )
435        return
436
437    async with await open_transport_or_link(sys.argv[2]) as hci_transport:
438        # Create a device to manage the host
439        device = Device.from_config_file_with_hci(
440            sys.argv[1], hci_transport.source, hci_transport.sink
441        )
442
443        command = sys.argv[3]
444        if command == 'connect':
445            # Run as a Keyboard host
446            await keyboard_host(device, sys.argv[4])
447        elif command in ('sim', 'web'):
448            # Run as a keyboard device
449            await keyboard_device(device, command)
450
451
452# -----------------------------------------------------------------------------
453logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
454asyncio.run(main())
455