• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import sys
20import os
21import logging
22import struct
23import websockets
24import json
25from colors import color
26
27from bumble.core import AdvertisingData
28from bumble.device import Device, Connection, Peer
29from bumble.utils import AsyncRunner
30from bumble.transport import open_transport_or_link
31from bumble.gatt import (
32    Descriptor,
33    Service,
34    Characteristic,
35    CharacteristicValue,
36    GATT_DEVICE_INFORMATION_SERVICE,
37    GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE,
38    GATT_DEVICE_BATTERY_SERVICE,
39    GATT_BATTERY_LEVEL_CHARACTERISTIC,
40    GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
41    GATT_REPORT_CHARACTERISTIC,
42    GATT_REPORT_MAP_CHARACTERISTIC,
43    GATT_PROTOCOL_MODE_CHARACTERISTIC,
44    GATT_HID_INFORMATION_CHARACTERISTIC,
45    GATT_HID_CONTROL_POINT_CHARACTERISTIC,
46    GATT_REPORT_REFERENCE_DESCRIPTOR
47)
48
49# -----------------------------------------------------------------------------
50
51# Protocol Modes
52HID_BOOT_PROTOCOL   = 0x00
53HID_REPORT_PROTOCOL = 0x01
54
55# Report Types
56HID_INPUT_REPORT   = 0x01
57HID_OUTPUT_REPORT  = 0x02
58HID_FEATURE_REPORT = 0x03
59
60# Report Map
61HID_KEYBOARD_REPORT_MAP = bytes([
62    0x05, 0x01,  # Usage Page (Generic Desktop Ctrls)
63    0x09, 0x06,  # Usage (Keyboard)
64    0xA1, 0x01,  # Collection (Application)
65    0x85, 0x01,  # . Report ID (1)
66    0x05, 0x07,  # . Usage Page (Kbrd/Keypad)
67    0x19, 0xE0,  # . Usage Minimum (0xE0)
68    0x29, 0xE7,  # . Usage Maximum (0xE7)
69    0x15, 0x00,  # . Logical Minimum (0)
70    0x25, 0x01,  # . Logical Maximum (1)
71    0x75, 0x01,  # . Report Size (1)
72    0x95, 0x08,  # . Report Count (8)
73    0x81, 0x02,  # . Input (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position)
74    0x95, 0x01,  # . Report Count (1)
75    0x75, 0x08,  # . Report Size (8)
76    0x81, 0x01,  # . Input (Const,Array,Abs,No Wrap,Linear,Preferred State,No Null Position)
77    0x95, 0x06,  # . Report Count (6)
78    0x75, 0x08,  # . Report Size (8)
79    0x15, 0x00,  # . Logical Minimum (0x00)
80    0x25, 0x94,  # . Logical Maximum (0x94)
81    0x05, 0x07,  # . Usage Page (Kbrd/Keypad)
82    0x19, 0x00,  # . Usage Minimum (0x00)
83    0x29, 0x94,  # . Usage Maximum (0x94)
84    0x81, 0x00,  # . Input (Data,Array,Abs,No Wrap,Linear,Preferred State,No Null Position)
85    0x95, 0x05,  # . Report Count (5)
86    0x75, 0x01,  # . Report Size (1)
87    0x05, 0x08,  # . Usage Page (LEDs)
88    0x19, 0x01,  # . Usage Minimum (Num Lock)
89    0x29, 0x05,  # . Usage Maximum (Kana)
90    0x91, 0x02,  # . Output (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile)
91    0x95, 0x01,  # . Report Count (1)
92    0x75, 0x03,  # . Report Size (3)
93    0x91, 0x01,  # . Output (Const,Array,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile)
94    0xC0         # End Collection
95])
96
97
98# -----------------------------------------------------------------------------
99class ServerListener(Device.Listener, Connection.Listener):
100    def __init__(self, device):
101        self.device = device
102
103    @AsyncRunner.run_in_task()
104    async def on_connection(self, connection):
105        print(f'=== Connected to {connection}')
106        connection.listener = self
107
108    @AsyncRunner.run_in_task()
109    async def on_disconnection(self, reason):
110        print(f'### Disconnected, reason={reason}')
111
112
113# -----------------------------------------------------------------------------
114def on_hid_control_point_write(connection, value):
115    print(f'Control Point Write: {value}')
116
117
118# -----------------------------------------------------------------------------
119def on_report(characteristic, value):
120    print(color('Report:', 'cyan'), value.hex(), 'from', characteristic)
121
122
123# -----------------------------------------------------------------------------
124async def keyboard_host(device, peer_address):
125    await device.power_on()
126    connection = await device.connect(peer_address)
127    await connection.pair()
128    peer = Peer(connection)
129    await peer.discover_service(GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE)
130    hid_services = peer.get_services_by_uuid(GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE)
131    if not hid_services:
132        print(color('!!! No HID service', 'red'))
133        return
134    await peer.discover_characteristics()
135
136    protocol_mode_characteristics = peer.get_characteristics_by_uuid(GATT_PROTOCOL_MODE_CHARACTERISTIC)
137    if not protocol_mode_characteristics:
138        print(color('!!! No Protocol Mode characteristic', 'red'))
139        return
140    protocol_mode_characteristic = protocol_mode_characteristics[0]
141
142    hid_information_characteristics = peer.get_characteristics_by_uuid(GATT_HID_INFORMATION_CHARACTERISTIC)
143    if not hid_information_characteristics:
144        print(color('!!! No HID Information characteristic', 'red'))
145        return
146    hid_information_characteristic = hid_information_characteristics[0]
147
148    report_map_characteristics = peer.get_characteristics_by_uuid(GATT_REPORT_MAP_CHARACTERISTIC)
149    if not report_map_characteristics:
150        print(color('!!! No Report Map characteristic', 'red'))
151        return
152    report_map_characteristic = report_map_characteristics[0]
153
154    control_point_characteristics = peer.get_characteristics_by_uuid(GATT_HID_CONTROL_POINT_CHARACTERISTIC)
155    if not control_point_characteristics:
156        print(color('!!! No Control Point characteristic', 'red'))
157        return
158    # control_point_characteristic = control_point_characteristics[0]
159
160    report_characteristics = peer.get_characteristics_by_uuid(GATT_REPORT_CHARACTERISTIC)
161    if not report_characteristics:
162        print(color('!!! No Report characteristic', 'red'))
163        return
164    for i, characteristic in enumerate(report_characteristics):
165        print(color('REPORT:', 'yellow'), characteristic)
166        if characteristic.properties & Characteristic.NOTIFY:
167            await peer.discover_descriptors(characteristic)
168            report_reference_descriptor = characteristic.get_descriptor(GATT_REPORT_REFERENCE_DESCRIPTOR)
169            if report_reference_descriptor:
170                report_reference = await peer.read_value(report_reference_descriptor)
171                print(color('  Report Reference:', 'blue'), report_reference.hex())
172            else:
173                report_reference = bytes([0, 0])
174            await peer.subscribe(characteristic, lambda value, param=f'[{i}] {report_reference.hex()}': on_report(param, value))
175
176    protocol_mode = await peer.read_value(protocol_mode_characteristic)
177    print(f'Protocol Mode: {protocol_mode.hex()}')
178    hid_information = await peer.read_value(hid_information_characteristic)
179    print(f'HID Information: {hid_information.hex()}')
180    report_map = await peer.read_value(report_map_characteristic)
181    print(f'Report Map: {report_map.hex()}')
182
183    await asyncio.get_running_loop().create_future()
184
185
186# -----------------------------------------------------------------------------
187async def keyboard_device(device, command):
188    # Create an 'input report' characteristic to send keyboard reports to the host
189    input_report_characteristic = Characteristic(
190        GATT_REPORT_CHARACTERISTIC,
191        Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
192        Characteristic.READABLE | Characteristic.WRITEABLE,
193        bytes([0, 0, 0, 0, 0, 0, 0, 0]),
194        [
195            Descriptor(GATT_REPORT_REFERENCE_DESCRIPTOR, Descriptor.READABLE, bytes([0x01, HID_INPUT_REPORT]))
196        ]
197    )
198
199    # Create an 'output report' characteristic to receive keyboard reports from the host
200    output_report_characteristic = Characteristic(
201        GATT_REPORT_CHARACTERISTIC,
202        Characteristic.READ | Characteristic.WRITE | Characteristic.WRITE_WITHOUT_RESPONSE,
203        Characteristic.READABLE | Characteristic.WRITEABLE,
204        bytes([0]),
205        [
206            Descriptor(GATT_REPORT_REFERENCE_DESCRIPTOR, Descriptor.READABLE, bytes([0x01, HID_OUTPUT_REPORT]))
207        ]
208    )
209
210    # Add the services to the GATT sever
211    device.add_services([
212        Service(
213            GATT_DEVICE_INFORMATION_SERVICE,
214            [
215                Characteristic(
216                    GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
217                    Characteristic.READ,
218                    Characteristic.READABLE,
219                    'Bumble'
220                )
221            ]
222        ),
223        Service(
224            GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE,
225            [
226                Characteristic(
227                    GATT_PROTOCOL_MODE_CHARACTERISTIC,
228                    Characteristic.READ,
229                    Characteristic.READABLE,
230                    bytes([HID_REPORT_PROTOCOL])
231                ),
232                Characteristic(
233                    GATT_HID_INFORMATION_CHARACTERISTIC,
234                    Characteristic.READ,
235                    Characteristic.READABLE,
236                    bytes([0x11, 0x01, 0x00, 0x03])  # bcdHID=1.1, bCountryCode=0x00, Flags=RemoteWake|NormallyConnectable
237                ),
238                Characteristic(
239                    GATT_HID_CONTROL_POINT_CHARACTERISTIC,
240                    Characteristic.WRITE_WITHOUT_RESPONSE,
241                    Characteristic.WRITEABLE,
242                    CharacteristicValue(write=on_hid_control_point_write)
243                ),
244                Characteristic(
245                    GATT_REPORT_MAP_CHARACTERISTIC,
246                    Characteristic.READ,
247                    Characteristic.READABLE,
248                    HID_KEYBOARD_REPORT_MAP
249                ),
250                input_report_characteristic,
251                output_report_characteristic
252            ]
253        ),
254        Service(
255            GATT_DEVICE_BATTERY_SERVICE,
256            [
257                Characteristic(
258                    GATT_BATTERY_LEVEL_CHARACTERISTIC,
259                    Characteristic.READ,
260                    Characteristic.READABLE,
261                    bytes([100])
262                )
263            ]
264        )
265    ])
266
267    # Debug print
268    for attribute in device.gatt_server.attributes:
269        print(attribute)
270
271    # Set the advertising data
272    device.advertising_data = bytes(
273        AdvertisingData([
274            (AdvertisingData.COMPLETE_LOCAL_NAME, bytes('Bumble Keyboard', 'utf-8')),
275            (AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
276                bytes(GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE)),
277            (AdvertisingData.APPEARANCE, struct.pack('<H', 0x03C1)),
278            (AdvertisingData.FLAGS, bytes([0x05]))
279        ])
280    )
281
282    # Attach a listener
283    device.listener = ServerListener(device)
284
285    # Go!
286    await device.power_on()
287    await device.start_advertising(auto_restart=True)
288
289    if command == 'web':
290        # Start a Websocket server to receive events from a web page
291        async def serve(websocket, path):
292            while True:
293                try:
294                    message = await websocket.recv()
295                    print('Received: ', str(message))
296
297                    parsed = json.loads(message)
298                    message_type = parsed['type']
299                    if message_type == 'keydown':
300                        # Only deal with keys a to z for now
301                        key = parsed['key']
302                        if len(key) == 1:
303                            code = ord(key)
304                            if code >= ord('a') and code <= ord('z'):
305                                hid_code = 0x04 + code - ord('a')
306                                input_report_characteristic.value = bytes([0, 0, hid_code, 0, 0, 0, 0, 0])
307                                await device.notify_subscribers(input_report_characteristic)
308                    elif message_type == 'keyup':
309                        input_report_characteristic.value = bytes.fromhex('0000000000000000')
310                        await device.notify_subscribers(input_report_characteristic)
311
312                except websockets.exceptions.ConnectionClosedOK:
313                    pass
314        await websockets.serve(serve, 'localhost', 8989)
315        await asyncio.get_event_loop().create_future()
316    else:
317        message = bytes('hello', 'ascii')
318        while True:
319            for letter in message:
320                await asyncio.sleep(3.0)
321
322                # Keypress for the letter
323                keycode = 0x04 + letter - 0x61
324                input_report_characteristic.value = bytes([0, 0, keycode, 0, 0, 0, 0, 0])
325                await device.notify_subscribers(input_report_characteristic)
326
327                # Key release
328                input_report_characteristic.value = bytes.fromhex('0000000000000000')
329                await device.notify_subscribers(input_report_characteristic)
330
331
332# -----------------------------------------------------------------------------
333async def main():
334    if len(sys.argv) < 4:
335        print('Usage: python keyboard.py <device-config> <transport-spec> <command>')
336        print('  where <command> is one of:')
337        print('  connect <address> (run a keyboard host, connecting to a keyboard)')
338        print('  web (run a keyboard with keypress input from a web page, see keyboard.html')
339        print('  sim (run a keyboard simulation, emitting a canned sequence of keystrokes')
340        print('example: python keyboard.py keyboard.json usb:0 sim')
341        print('example: python keyboard.py keyboard.json usb:0 connect A0:A1:A2:A3:A4:A5')
342        return
343
344    async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
345        # Create a device to manage the host
346        device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
347
348        command = sys.argv[3]
349        if command == 'connect':
350            # Run as a Keyboard host
351            await keyboard_host(device, sys.argv[4])
352        elif command in {'sim', 'web'}:
353            # Run as a keyboard device
354            await keyboard_device(device, command)
355
356
357# -----------------------------------------------------------------------------
358logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
359asyncio.run(main())
360