1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19import sys 20import os 21import logging 22import struct 23import json 24import websockets 25from bumble.colors import color 26 27from bumble.core import AdvertisingData 28from bumble.device import Device, Connection, Peer 29from bumble.utils import AsyncRunner 30from bumble.transport import open_transport_or_link 31from bumble.gatt import ( 32 Descriptor, 33 Service, 34 Characteristic, 35 CharacteristicValue, 36 GATT_DEVICE_INFORMATION_SERVICE, 37 GATT_HUMAN_INTERFACE_DEVICE_SERVICE, 38 GATT_BATTERY_SERVICE, 39 GATT_BATTERY_LEVEL_CHARACTERISTIC, 40 GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC, 41 GATT_REPORT_CHARACTERISTIC, 42 GATT_REPORT_MAP_CHARACTERISTIC, 43 GATT_PROTOCOL_MODE_CHARACTERISTIC, 44 GATT_HID_INFORMATION_CHARACTERISTIC, 45 GATT_HID_CONTROL_POINT_CHARACTERISTIC, 46 GATT_REPORT_REFERENCE_DESCRIPTOR, 47) 48 49# ----------------------------------------------------------------------------- 50 51# Protocol Modes 52HID_BOOT_PROTOCOL = 0x00 53HID_REPORT_PROTOCOL = 0x01 54 55# Report Types 56HID_INPUT_REPORT = 0x01 57HID_OUTPUT_REPORT = 0x02 58HID_FEATURE_REPORT = 0x03 59 60# Report Map 61HID_KEYBOARD_REPORT_MAP = bytes( 62 # pylint: disable=line-too-long 63 [ 64 0x05, 65 0x01, # Usage Page (Generic Desktop Controls) 66 0x09, 67 0x06, # Usage (Keyboard) 68 0xA1, 69 0x01, # Collection (Application) 70 0x85, 71 0x01, # . Report ID (1) 72 0x05, 73 0x07, # . Usage Page (Keyboard/Keypad) 74 0x19, 75 0xE0, # . Usage Minimum (0xE0) 76 0x29, 77 0xE7, # . Usage Maximum (0xE7) 78 0x15, 79 0x00, # . Logical Minimum (0) 80 0x25, 81 0x01, # . Logical Maximum (1) 82 0x75, 83 0x01, # . Report Size (1) 84 0x95, 85 0x08, # . Report Count (8) 86 0x81, 87 0x02, # . Input (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position) 88 0x95, 89 0x01, # . Report Count (1) 90 0x75, 91 0x08, # . Report Size (8) 92 0x81, 93 0x01, # . Input (Const,Array,Abs,No Wrap,Linear,Preferred State,No Null Position) 94 0x95, 95 0x06, # . Report Count (6) 96 0x75, 97 0x08, # . Report Size (8) 98 0x15, 99 0x00, # . Logical Minimum (0x00) 100 0x25, 101 0x94, # . Logical Maximum (0x94) 102 0x05, 103 0x07, # . Usage Page (Keyboard/Keypad) 104 0x19, 105 0x00, # . Usage Minimum (0x00) 106 0x29, 107 0x94, # . Usage Maximum (0x94) 108 0x81, 109 0x00, # . Input (Data,Array,Abs,No Wrap,Linear,Preferred State,No Null Position) 110 0x95, 111 0x05, # . Report Count (5) 112 0x75, 113 0x01, # . Report Size (1) 114 0x05, 115 0x08, # . Usage Page (LEDs) 116 0x19, 117 0x01, # . Usage Minimum (Num Lock) 118 0x29, 119 0x05, # . Usage Maximum (Kana) 120 0x91, 121 0x02, # . Output (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile) 122 0x95, 123 0x01, # . Report Count (1) 124 0x75, 125 0x03, # . Report Size (3) 126 0x91, 127 0x01, # . Output (Const,Array,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile) 128 0xC0, # End Collection 129 ] 130) 131 132 133# ----------------------------------------------------------------------------- 134# pylint: disable=invalid-overridden-method 135class ServerListener(Device.Listener, Connection.Listener): 136 def __init__(self, device): 137 self.device = device 138 139 @AsyncRunner.run_in_task() 140 async def on_connection(self, connection): 141 print(f'=== Connected to {connection}') 142 connection.listener = self 143 144 @AsyncRunner.run_in_task() 145 async def on_disconnection(self, reason): 146 print(f'### Disconnected, reason={reason}') 147 148 149# ----------------------------------------------------------------------------- 150def on_hid_control_point_write(_connection, value): 151 print(f'Control Point Write: {value}') 152 153 154# ----------------------------------------------------------------------------- 155def on_report(characteristic, value): 156 print(color('Report:', 'cyan'), value.hex(), 'from', characteristic) 157 158 159# ----------------------------------------------------------------------------- 160async def keyboard_host(device, peer_address): 161 await device.power_on() 162 connection = await device.connect(peer_address) 163 await connection.pair() 164 peer = Peer(connection) 165 await peer.discover_service(GATT_HUMAN_INTERFACE_DEVICE_SERVICE) 166 hid_services = peer.get_services_by_uuid(GATT_HUMAN_INTERFACE_DEVICE_SERVICE) 167 if not hid_services: 168 print(color('!!! No HID service', 'red')) 169 return 170 await peer.discover_characteristics() 171 172 protocol_mode_characteristics = peer.get_characteristics_by_uuid( 173 GATT_PROTOCOL_MODE_CHARACTERISTIC 174 ) 175 if not protocol_mode_characteristics: 176 print(color('!!! No Protocol Mode characteristic', 'red')) 177 return 178 protocol_mode_characteristic = protocol_mode_characteristics[0] 179 180 hid_information_characteristics = peer.get_characteristics_by_uuid( 181 GATT_HID_INFORMATION_CHARACTERISTIC 182 ) 183 if not hid_information_characteristics: 184 print(color('!!! No HID Information characteristic', 'red')) 185 return 186 hid_information_characteristic = hid_information_characteristics[0] 187 188 report_map_characteristics = peer.get_characteristics_by_uuid( 189 GATT_REPORT_MAP_CHARACTERISTIC 190 ) 191 if not report_map_characteristics: 192 print(color('!!! No Report Map characteristic', 'red')) 193 return 194 report_map_characteristic = report_map_characteristics[0] 195 196 control_point_characteristics = peer.get_characteristics_by_uuid( 197 GATT_HID_CONTROL_POINT_CHARACTERISTIC 198 ) 199 if not control_point_characteristics: 200 print(color('!!! No Control Point characteristic', 'red')) 201 return 202 # control_point_characteristic = control_point_characteristics[0] 203 204 report_characteristics = peer.get_characteristics_by_uuid( 205 GATT_REPORT_CHARACTERISTIC 206 ) 207 if not report_characteristics: 208 print(color('!!! No Report characteristic', 'red')) 209 return 210 for i, characteristic in enumerate(report_characteristics): 211 print(color('REPORT:', 'yellow'), characteristic) 212 if characteristic.properties & Characteristic.Properties.NOTIFY: 213 await peer.discover_descriptors(characteristic) 214 report_reference_descriptor = characteristic.get_descriptor( 215 GATT_REPORT_REFERENCE_DESCRIPTOR 216 ) 217 if report_reference_descriptor: 218 report_reference = await peer.read_value(report_reference_descriptor) 219 print(color(' Report Reference:', 'blue'), report_reference.hex()) 220 else: 221 report_reference = bytes([0, 0]) 222 await peer.subscribe( 223 characteristic, 224 lambda value, param=f'[{i}] {report_reference.hex()}': on_report( 225 param, value 226 ), 227 ) 228 229 protocol_mode = await peer.read_value(protocol_mode_characteristic) 230 print(f'Protocol Mode: {protocol_mode.hex()}') 231 hid_information = await peer.read_value(hid_information_characteristic) 232 print(f'HID Information: {hid_information.hex()}') 233 report_map = await peer.read_value(report_map_characteristic) 234 print(f'Report Map: {report_map.hex()}') 235 236 await asyncio.get_running_loop().create_future() 237 238 239# ----------------------------------------------------------------------------- 240async def keyboard_device(device, command): 241 # Create an 'input report' characteristic to send keyboard reports to the host 242 input_report_characteristic = Characteristic( 243 GATT_REPORT_CHARACTERISTIC, 244 Characteristic.Properties.READ 245 | Characteristic.Properties.WRITE 246 | Characteristic.Properties.NOTIFY, 247 Characteristic.READABLE | Characteristic.WRITEABLE, 248 bytes([0, 0, 0, 0, 0, 0, 0, 0]), 249 [ 250 Descriptor( 251 GATT_REPORT_REFERENCE_DESCRIPTOR, 252 Descriptor.READABLE, 253 bytes([0x01, HID_INPUT_REPORT]), 254 ) 255 ], 256 ) 257 258 # Create an 'output report' characteristic to receive keyboard reports from the host 259 output_report_characteristic = Characteristic( 260 GATT_REPORT_CHARACTERISTIC, 261 Characteristic.Properties.READ 262 | Characteristic.Properties.WRITE 263 | Characteristic.WRITE_WITHOUT_RESPONSE, 264 Characteristic.READABLE | Characteristic.WRITEABLE, 265 bytes([0]), 266 [ 267 Descriptor( 268 GATT_REPORT_REFERENCE_DESCRIPTOR, 269 Descriptor.READABLE, 270 bytes([0x01, HID_OUTPUT_REPORT]), 271 ) 272 ], 273 ) 274 275 # Add the services to the GATT sever 276 device.add_services( 277 [ 278 Service( 279 GATT_DEVICE_INFORMATION_SERVICE, 280 [ 281 Characteristic( 282 GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC, 283 Characteristic.Properties.READ, 284 Characteristic.READABLE, 285 'Bumble', 286 ) 287 ], 288 ), 289 Service( 290 GATT_HUMAN_INTERFACE_DEVICE_SERVICE, 291 [ 292 Characteristic( 293 GATT_PROTOCOL_MODE_CHARACTERISTIC, 294 Characteristic.Properties.READ, 295 Characteristic.READABLE, 296 bytes([HID_REPORT_PROTOCOL]), 297 ), 298 Characteristic( 299 GATT_HID_INFORMATION_CHARACTERISTIC, 300 Characteristic.Properties.READ, 301 Characteristic.READABLE, 302 # bcdHID=1.1, bCountryCode=0x00, 303 # Flags=RemoteWake|NormallyConnectable 304 bytes([0x11, 0x01, 0x00, 0x03]), 305 ), 306 Characteristic( 307 GATT_HID_CONTROL_POINT_CHARACTERISTIC, 308 Characteristic.WRITE_WITHOUT_RESPONSE, 309 Characteristic.WRITEABLE, 310 CharacteristicValue(write=on_hid_control_point_write), 311 ), 312 Characteristic( 313 GATT_REPORT_MAP_CHARACTERISTIC, 314 Characteristic.Properties.READ, 315 Characteristic.READABLE, 316 HID_KEYBOARD_REPORT_MAP, 317 ), 318 input_report_characteristic, 319 output_report_characteristic, 320 ], 321 ), 322 Service( 323 GATT_BATTERY_SERVICE, 324 [ 325 Characteristic( 326 GATT_BATTERY_LEVEL_CHARACTERISTIC, 327 Characteristic.Properties.READ, 328 Characteristic.READABLE, 329 bytes([100]), 330 ) 331 ], 332 ), 333 ] 334 ) 335 336 # Debug print 337 for attribute in device.gatt_server.attributes: 338 print(attribute) 339 340 # Set the advertising data 341 device.advertising_data = bytes( 342 AdvertisingData( 343 [ 344 ( 345 AdvertisingData.COMPLETE_LOCAL_NAME, 346 bytes('Bumble Keyboard', 'utf-8'), 347 ), 348 ( 349 AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, 350 bytes(GATT_HUMAN_INTERFACE_DEVICE_SERVICE), 351 ), 352 (AdvertisingData.APPEARANCE, struct.pack('<H', 0x03C1)), 353 (AdvertisingData.FLAGS, bytes([0x05])), 354 ] 355 ) 356 ) 357 358 # Attach a listener 359 device.listener = ServerListener(device) 360 361 # Go! 362 await device.power_on() 363 await device.start_advertising(auto_restart=True) 364 365 if command == 'web': 366 # Start a Websocket server to receive events from a web page 367 async def serve(websocket, _path): 368 while True: 369 try: 370 message = await websocket.recv() 371 print('Received: ', str(message)) 372 373 parsed = json.loads(message) 374 message_type = parsed['type'] 375 if message_type == 'keydown': 376 # Only deal with keys a to z for now 377 key = parsed['key'] 378 if len(key) == 1: 379 code = ord(key) 380 if ord('a') <= code <= ord('z'): 381 hid_code = 0x04 + code - ord('a') 382 input_report_characteristic.value = bytes( 383 [0, 0, hid_code, 0, 0, 0, 0, 0] 384 ) 385 await device.notify_subscribers( 386 input_report_characteristic 387 ) 388 elif message_type == 'keyup': 389 input_report_characteristic.value = bytes.fromhex( 390 '0000000000000000' 391 ) 392 await device.notify_subscribers(input_report_characteristic) 393 394 except websockets.exceptions.ConnectionClosedOK: 395 pass 396 397 # pylint: disable-next=no-member 398 await websockets.serve(serve, 'localhost', 8989) 399 await asyncio.get_event_loop().create_future() 400 else: 401 message = bytes('hello', 'ascii') 402 while True: 403 for letter in message: 404 await asyncio.sleep(3.0) 405 406 # Keypress for the letter 407 keycode = 0x04 + letter - 0x61 408 input_report_characteristic.value = bytes( 409 [0, 0, keycode, 0, 0, 0, 0, 0] 410 ) 411 await device.notify_subscribers(input_report_characteristic) 412 413 # Key release 414 input_report_characteristic.value = bytes.fromhex('0000000000000000') 415 await device.notify_subscribers(input_report_characteristic) 416 417 418# ----------------------------------------------------------------------------- 419async def main() -> None: 420 if len(sys.argv) < 4: 421 print( 422 'Usage: python keyboard.py <device-config> <transport-spec> <command>' 423 ' where <command> is one of:\n' 424 ' connect <address> (run a keyboard host, connecting to a keyboard)\n' 425 ' web (run a keyboard with keypress input from a web page, ' 426 'see keyboard.html\n' 427 ) 428 print( 429 ' sim (run a keyboard simulation, emitting a canned sequence of keystrokes' 430 ) 431 print('example: python keyboard.py keyboard.json usb:0 sim') 432 print( 433 'example: python keyboard.py keyboard.json usb:0 connect A0:A1:A2:A3:A4:A5' 434 ) 435 return 436 437 async with await open_transport_or_link(sys.argv[2]) as hci_transport: 438 # Create a device to manage the host 439 device = Device.from_config_file_with_hci( 440 sys.argv[1], hci_transport.source, hci_transport.sink 441 ) 442 443 command = sys.argv[3] 444 if command == 'connect': 445 # Run as a Keyboard host 446 await keyboard_host(device, sys.argv[4]) 447 elif command in ('sim', 'web'): 448 # Run as a keyboard device 449 await keyboard_device(device, command) 450 451 452# ----------------------------------------------------------------------------- 453logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper()) 454asyncio.run(main()) 455