• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import sys
20import os
21import logging
22import struct
23import json
24import websockets
25from bumble.colors import color
26
27from bumble.core import AdvertisingData
28from bumble.device import Device, Connection, Peer
29from bumble.utils import AsyncRunner
30from bumble.transport import open_transport_or_link
31from bumble.gatt import (
32    Descriptor,
33    Service,
34    Characteristic,
35    CharacteristicValue,
36    GATT_DEVICE_INFORMATION_SERVICE,
37    GATT_HUMAN_INTERFACE_DEVICE_SERVICE,
38    GATT_BATTERY_SERVICE,
39    GATT_BATTERY_LEVEL_CHARACTERISTIC,
40    GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
41    GATT_REPORT_CHARACTERISTIC,
42    GATT_REPORT_MAP_CHARACTERISTIC,
43    GATT_PROTOCOL_MODE_CHARACTERISTIC,
44    GATT_HID_INFORMATION_CHARACTERISTIC,
45    GATT_HID_CONTROL_POINT_CHARACTERISTIC,
46    GATT_REPORT_REFERENCE_DESCRIPTOR,
47)
48
49# -----------------------------------------------------------------------------
50
51# Protocol Modes
52HID_BOOT_PROTOCOL = 0x00
53HID_REPORT_PROTOCOL = 0x01
54
55# Report Types
56HID_INPUT_REPORT = 0x01
57HID_OUTPUT_REPORT = 0x02
58HID_FEATURE_REPORT = 0x03
59
60# Report Map
61HID_KEYBOARD_REPORT_MAP = bytes(
62    # pylint: disable=line-too-long
63    [
64        0x05,
65        0x01,  # Usage Page (Generic Desktop Controls)
66        0x09,
67        0x06,  # Usage (Keyboard)
68        0xA1,
69        0x01,  # Collection (Application)
70        0x85,
71        0x01,  # . Report ID (1)
72        0x05,
73        0x07,  # . Usage Page (Keyboard/Keypad)
74        0x19,
75        0xE0,  # . Usage Minimum (0xE0)
76        0x29,
77        0xE7,  # . Usage Maximum (0xE7)
78        0x15,
79        0x00,  # . Logical Minimum (0)
80        0x25,
81        0x01,  # . Logical Maximum (1)
82        0x75,
83        0x01,  # . Report Size (1)
84        0x95,
85        0x08,  # . Report Count (8)
86        0x81,
87        0x02,  # . Input (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position)
88        0x95,
89        0x01,  # . Report Count (1)
90        0x75,
91        0x08,  # . Report Size (8)
92        0x81,
93        0x01,  # . Input (Const,Array,Abs,No Wrap,Linear,Preferred State,No Null Position)
94        0x95,
95        0x06,  # . Report Count (6)
96        0x75,
97        0x08,  # . Report Size (8)
98        0x15,
99        0x00,  # . Logical Minimum (0x00)
100        0x25,
101        0x94,  # . Logical Maximum (0x94)
102        0x05,
103        0x07,  # . Usage Page (Keyboard/Keypad)
104        0x19,
105        0x00,  # . Usage Minimum (0x00)
106        0x29,
107        0x94,  # . Usage Maximum (0x94)
108        0x81,
109        0x00,  # . Input (Data,Array,Abs,No Wrap,Linear,Preferred State,No Null Position)
110        0x95,
111        0x05,  # . Report Count (5)
112        0x75,
113        0x01,  # . Report Size (1)
114        0x05,
115        0x08,  # . Usage Page (LEDs)
116        0x19,
117        0x01,  # . Usage Minimum (Num Lock)
118        0x29,
119        0x05,  # . Usage Maximum (Kana)
120        0x91,
121        0x02,  # . Output (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile)
122        0x95,
123        0x01,  # . Report Count (1)
124        0x75,
125        0x03,  # . Report Size (3)
126        0x91,
127        0x01,  # . Output (Const,Array,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile)
128        0xC0,  # End Collection
129    ]
130)
131
132
133# -----------------------------------------------------------------------------
134# pylint: disable=invalid-overridden-method
135class ServerListener(Device.Listener, Connection.Listener):
136    def __init__(self, device):
137        self.device = device
138
139    @AsyncRunner.run_in_task()
140    async def on_connection(self, connection):
141        print(f'=== Connected to {connection}')
142        connection.listener = self
143
144    @AsyncRunner.run_in_task()
145    async def on_disconnection(self, reason):
146        print(f'### Disconnected, reason={reason}')
147
148
149# -----------------------------------------------------------------------------
150def on_hid_control_point_write(_connection, value):
151    print(f'Control Point Write: {value}')
152
153
154# -----------------------------------------------------------------------------
155def on_report(characteristic, value):
156    print(color('Report:', 'cyan'), value.hex(), 'from', characteristic)
157
158
159# -----------------------------------------------------------------------------
160async def keyboard_host(device, peer_address):
161    await device.power_on()
162    connection = await device.connect(peer_address)
163    await connection.pair()
164    peer = Peer(connection)
165    await peer.discover_service(GATT_HUMAN_INTERFACE_DEVICE_SERVICE)
166    hid_services = peer.get_services_by_uuid(GATT_HUMAN_INTERFACE_DEVICE_SERVICE)
167    if not hid_services:
168        print(color('!!! No HID service', 'red'))
169        return
170    await peer.discover_characteristics()
171
172    protocol_mode_characteristics = peer.get_characteristics_by_uuid(
173        GATT_PROTOCOL_MODE_CHARACTERISTIC
174    )
175    if not protocol_mode_characteristics:
176        print(color('!!! No Protocol Mode characteristic', 'red'))
177        return
178    protocol_mode_characteristic = protocol_mode_characteristics[0]
179
180    hid_information_characteristics = peer.get_characteristics_by_uuid(
181        GATT_HID_INFORMATION_CHARACTERISTIC
182    )
183    if not hid_information_characteristics:
184        print(color('!!! No HID Information characteristic', 'red'))
185        return
186    hid_information_characteristic = hid_information_characteristics[0]
187
188    report_map_characteristics = peer.get_characteristics_by_uuid(
189        GATT_REPORT_MAP_CHARACTERISTIC
190    )
191    if not report_map_characteristics:
192        print(color('!!! No Report Map characteristic', 'red'))
193        return
194    report_map_characteristic = report_map_characteristics[0]
195
196    control_point_characteristics = peer.get_characteristics_by_uuid(
197        GATT_HID_CONTROL_POINT_CHARACTERISTIC
198    )
199    if not control_point_characteristics:
200        print(color('!!! No Control Point characteristic', 'red'))
201        return
202    # control_point_characteristic = control_point_characteristics[0]
203
204    report_characteristics = peer.get_characteristics_by_uuid(
205        GATT_REPORT_CHARACTERISTIC
206    )
207    if not report_characteristics:
208        print(color('!!! No Report characteristic', 'red'))
209        return
210    for i, characteristic in enumerate(report_characteristics):
211        print(color('REPORT:', 'yellow'), characteristic)
212        if characteristic.properties & Characteristic.NOTIFY:
213            await peer.discover_descriptors(characteristic)
214            report_reference_descriptor = characteristic.get_descriptor(
215                GATT_REPORT_REFERENCE_DESCRIPTOR
216            )
217            if report_reference_descriptor:
218                report_reference = await peer.read_value(report_reference_descriptor)
219                print(color('  Report Reference:', 'blue'), report_reference.hex())
220            else:
221                report_reference = bytes([0, 0])
222            await peer.subscribe(
223                characteristic,
224                lambda value, param=f'[{i}] {report_reference.hex()}': on_report(
225                    param, value
226                ),
227            )
228
229    protocol_mode = await peer.read_value(protocol_mode_characteristic)
230    print(f'Protocol Mode: {protocol_mode.hex()}')
231    hid_information = await peer.read_value(hid_information_characteristic)
232    print(f'HID Information: {hid_information.hex()}')
233    report_map = await peer.read_value(report_map_characteristic)
234    print(f'Report Map: {report_map.hex()}')
235
236    await asyncio.get_running_loop().create_future()
237
238
239# -----------------------------------------------------------------------------
240async def keyboard_device(device, command):
241    # Create an 'input report' characteristic to send keyboard reports to the host
242    input_report_characteristic = Characteristic(
243        GATT_REPORT_CHARACTERISTIC,
244        Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
245        Characteristic.READABLE | Characteristic.WRITEABLE,
246        bytes([0, 0, 0, 0, 0, 0, 0, 0]),
247        [
248            Descriptor(
249                GATT_REPORT_REFERENCE_DESCRIPTOR,
250                Descriptor.READABLE,
251                bytes([0x01, HID_INPUT_REPORT]),
252            )
253        ],
254    )
255
256    # Create an 'output report' characteristic to receive keyboard reports from the host
257    output_report_characteristic = Characteristic(
258        GATT_REPORT_CHARACTERISTIC,
259        Characteristic.READ
260        | Characteristic.WRITE
261        | Characteristic.WRITE_WITHOUT_RESPONSE,
262        Characteristic.READABLE | Characteristic.WRITEABLE,
263        bytes([0]),
264        [
265            Descriptor(
266                GATT_REPORT_REFERENCE_DESCRIPTOR,
267                Descriptor.READABLE,
268                bytes([0x01, HID_OUTPUT_REPORT]),
269            )
270        ],
271    )
272
273    # Add the services to the GATT sever
274    device.add_services(
275        [
276            Service(
277                GATT_DEVICE_INFORMATION_SERVICE,
278                [
279                    Characteristic(
280                        GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
281                        Characteristic.READ,
282                        Characteristic.READABLE,
283                        'Bumble',
284                    )
285                ],
286            ),
287            Service(
288                GATT_HUMAN_INTERFACE_DEVICE_SERVICE,
289                [
290                    Characteristic(
291                        GATT_PROTOCOL_MODE_CHARACTERISTIC,
292                        Characteristic.READ,
293                        Characteristic.READABLE,
294                        bytes([HID_REPORT_PROTOCOL]),
295                    ),
296                    Characteristic(
297                        GATT_HID_INFORMATION_CHARACTERISTIC,
298                        Characteristic.READ,
299                        Characteristic.READABLE,
300                        # bcdHID=1.1, bCountryCode=0x00,
301                        # Flags=RemoteWake|NormallyConnectable
302                        bytes([0x11, 0x01, 0x00, 0x03]),
303                    ),
304                    Characteristic(
305                        GATT_HID_CONTROL_POINT_CHARACTERISTIC,
306                        Characteristic.WRITE_WITHOUT_RESPONSE,
307                        Characteristic.WRITEABLE,
308                        CharacteristicValue(write=on_hid_control_point_write),
309                    ),
310                    Characteristic(
311                        GATT_REPORT_MAP_CHARACTERISTIC,
312                        Characteristic.READ,
313                        Characteristic.READABLE,
314                        HID_KEYBOARD_REPORT_MAP,
315                    ),
316                    input_report_characteristic,
317                    output_report_characteristic,
318                ],
319            ),
320            Service(
321                GATT_BATTERY_SERVICE,
322                [
323                    Characteristic(
324                        GATT_BATTERY_LEVEL_CHARACTERISTIC,
325                        Characteristic.READ,
326                        Characteristic.READABLE,
327                        bytes([100]),
328                    )
329                ],
330            ),
331        ]
332    )
333
334    # Debug print
335    for attribute in device.gatt_server.attributes:
336        print(attribute)
337
338    # Set the advertising data
339    device.advertising_data = bytes(
340        AdvertisingData(
341            [
342                (
343                    AdvertisingData.COMPLETE_LOCAL_NAME,
344                    bytes('Bumble Keyboard', 'utf-8'),
345                ),
346                (
347                    AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
348                    bytes(GATT_HUMAN_INTERFACE_DEVICE_SERVICE),
349                ),
350                (AdvertisingData.APPEARANCE, struct.pack('<H', 0x03C1)),
351                (AdvertisingData.FLAGS, bytes([0x05])),
352            ]
353        )
354    )
355
356    # Attach a listener
357    device.listener = ServerListener(device)
358
359    # Go!
360    await device.power_on()
361    await device.start_advertising(auto_restart=True)
362
363    if command == 'web':
364        # Start a Websocket server to receive events from a web page
365        async def serve(websocket, _path):
366            while True:
367                try:
368                    message = await websocket.recv()
369                    print('Received: ', str(message))
370
371                    parsed = json.loads(message)
372                    message_type = parsed['type']
373                    if message_type == 'keydown':
374                        # Only deal with keys a to z for now
375                        key = parsed['key']
376                        if len(key) == 1:
377                            code = ord(key)
378                            if ord('a') <= code <= ord('z'):
379                                hid_code = 0x04 + code - ord('a')
380                                input_report_characteristic.value = bytes(
381                                    [0, 0, hid_code, 0, 0, 0, 0, 0]
382                                )
383                                await device.notify_subscribers(
384                                    input_report_characteristic
385                                )
386                    elif message_type == 'keyup':
387                        input_report_characteristic.value = bytes.fromhex(
388                            '0000000000000000'
389                        )
390                        await device.notify_subscribers(input_report_characteristic)
391
392                except websockets.exceptions.ConnectionClosedOK:
393                    pass
394
395        # pylint: disable-next=no-member
396        await websockets.serve(serve, 'localhost', 8989)
397        await asyncio.get_event_loop().create_future()
398    else:
399        message = bytes('hello', 'ascii')
400        while True:
401            for letter in message:
402                await asyncio.sleep(3.0)
403
404                # Keypress for the letter
405                keycode = 0x04 + letter - 0x61
406                input_report_characteristic.value = bytes(
407                    [0, 0, keycode, 0, 0, 0, 0, 0]
408                )
409                await device.notify_subscribers(input_report_characteristic)
410
411                # Key release
412                input_report_characteristic.value = bytes.fromhex('0000000000000000')
413                await device.notify_subscribers(input_report_characteristic)
414
415
416# -----------------------------------------------------------------------------
417async def main():
418    if len(sys.argv) < 4:
419        print(
420            'Usage: python keyboard.py <device-config> <transport-spec> <command>'
421            '  where <command> is one of:\n'
422            '  connect <address> (run a keyboard host, connecting to a keyboard)\n'
423            '  web (run a keyboard with keypress input from a web page, '
424            'see keyboard.html\n'
425        )
426        print(
427            '  sim (run a keyboard simulation, emitting a canned sequence of keystrokes'
428        )
429        print('example: python keyboard.py keyboard.json usb:0 sim')
430        print(
431            'example: python keyboard.py keyboard.json usb:0 connect A0:A1:A2:A3:A4:A5'
432        )
433        return
434
435    async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
436        # Create a device to manage the host
437        device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
438
439        command = sys.argv[3]
440        if command == 'connect':
441            # Run as a Keyboard host
442            await keyboard_host(device, sys.argv[4])
443        elif command in ('sim', 'web'):
444            # Run as a keyboard device
445            await keyboard_device(device, command)
446
447
448# -----------------------------------------------------------------------------
449logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
450asyncio.run(main())
451