1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19import sys 20import os 21import logging 22import struct 23import json 24import websockets 25from bumble.colors import color 26 27from bumble.core import AdvertisingData 28from bumble.device import Device, Connection, Peer 29from bumble.utils import AsyncRunner 30from bumble.transport import open_transport_or_link 31from bumble.gatt import ( 32 Descriptor, 33 Service, 34 Characteristic, 35 CharacteristicValue, 36 GATT_DEVICE_INFORMATION_SERVICE, 37 GATT_HUMAN_INTERFACE_DEVICE_SERVICE, 38 GATT_BATTERY_SERVICE, 39 GATT_BATTERY_LEVEL_CHARACTERISTIC, 40 GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC, 41 GATT_REPORT_CHARACTERISTIC, 42 GATT_REPORT_MAP_CHARACTERISTIC, 43 GATT_PROTOCOL_MODE_CHARACTERISTIC, 44 GATT_HID_INFORMATION_CHARACTERISTIC, 45 GATT_HID_CONTROL_POINT_CHARACTERISTIC, 46 GATT_REPORT_REFERENCE_DESCRIPTOR, 47) 48 49# ----------------------------------------------------------------------------- 50 51# Protocol Modes 52HID_BOOT_PROTOCOL = 0x00 53HID_REPORT_PROTOCOL = 0x01 54 55# Report Types 56HID_INPUT_REPORT = 0x01 57HID_OUTPUT_REPORT = 0x02 58HID_FEATURE_REPORT = 0x03 59 60# Report Map 61HID_KEYBOARD_REPORT_MAP = bytes( 62 # pylint: disable=line-too-long 63 [ 64 0x05, 65 0x01, # Usage Page (Generic Desktop Controls) 66 0x09, 67 0x06, # Usage (Keyboard) 68 0xA1, 69 0x01, # Collection (Application) 70 0x85, 71 0x01, # . Report ID (1) 72 0x05, 73 0x07, # . Usage Page (Keyboard/Keypad) 74 0x19, 75 0xE0, # . Usage Minimum (0xE0) 76 0x29, 77 0xE7, # . Usage Maximum (0xE7) 78 0x15, 79 0x00, # . Logical Minimum (0) 80 0x25, 81 0x01, # . Logical Maximum (1) 82 0x75, 83 0x01, # . Report Size (1) 84 0x95, 85 0x08, # . Report Count (8) 86 0x81, 87 0x02, # . Input (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position) 88 0x95, 89 0x01, # . Report Count (1) 90 0x75, 91 0x08, # . Report Size (8) 92 0x81, 93 0x01, # . Input (Const,Array,Abs,No Wrap,Linear,Preferred State,No Null Position) 94 0x95, 95 0x06, # . Report Count (6) 96 0x75, 97 0x08, # . Report Size (8) 98 0x15, 99 0x00, # . Logical Minimum (0x00) 100 0x25, 101 0x94, # . Logical Maximum (0x94) 102 0x05, 103 0x07, # . Usage Page (Keyboard/Keypad) 104 0x19, 105 0x00, # . Usage Minimum (0x00) 106 0x29, 107 0x94, # . Usage Maximum (0x94) 108 0x81, 109 0x00, # . Input (Data,Array,Abs,No Wrap,Linear,Preferred State,No Null Position) 110 0x95, 111 0x05, # . Report Count (5) 112 0x75, 113 0x01, # . Report Size (1) 114 0x05, 115 0x08, # . Usage Page (LEDs) 116 0x19, 117 0x01, # . Usage Minimum (Num Lock) 118 0x29, 119 0x05, # . Usage Maximum (Kana) 120 0x91, 121 0x02, # . Output (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile) 122 0x95, 123 0x01, # . Report Count (1) 124 0x75, 125 0x03, # . Report Size (3) 126 0x91, 127 0x01, # . Output (Const,Array,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile) 128 0xC0, # End Collection 129 ] 130) 131 132 133# ----------------------------------------------------------------------------- 134# pylint: disable=invalid-overridden-method 135class ServerListener(Device.Listener, Connection.Listener): 136 def __init__(self, device): 137 self.device = device 138 139 @AsyncRunner.run_in_task() 140 async def on_connection(self, connection): 141 print(f'=== Connected to {connection}') 142 connection.listener = self 143 144 @AsyncRunner.run_in_task() 145 async def on_disconnection(self, reason): 146 print(f'### Disconnected, reason={reason}') 147 148 149# ----------------------------------------------------------------------------- 150def on_hid_control_point_write(_connection, value): 151 print(f'Control Point Write: {value}') 152 153 154# ----------------------------------------------------------------------------- 155def on_report(characteristic, value): 156 print(color('Report:', 'cyan'), value.hex(), 'from', characteristic) 157 158 159# ----------------------------------------------------------------------------- 160async def keyboard_host(device, peer_address): 161 await device.power_on() 162 connection = await device.connect(peer_address) 163 await connection.pair() 164 peer = Peer(connection) 165 await peer.discover_service(GATT_HUMAN_INTERFACE_DEVICE_SERVICE) 166 hid_services = peer.get_services_by_uuid(GATT_HUMAN_INTERFACE_DEVICE_SERVICE) 167 if not hid_services: 168 print(color('!!! No HID service', 'red')) 169 return 170 await peer.discover_characteristics() 171 172 protocol_mode_characteristics = peer.get_characteristics_by_uuid( 173 GATT_PROTOCOL_MODE_CHARACTERISTIC 174 ) 175 if not protocol_mode_characteristics: 176 print(color('!!! No Protocol Mode characteristic', 'red')) 177 return 178 protocol_mode_characteristic = protocol_mode_characteristics[0] 179 180 hid_information_characteristics = peer.get_characteristics_by_uuid( 181 GATT_HID_INFORMATION_CHARACTERISTIC 182 ) 183 if not hid_information_characteristics: 184 print(color('!!! No HID Information characteristic', 'red')) 185 return 186 hid_information_characteristic = hid_information_characteristics[0] 187 188 report_map_characteristics = peer.get_characteristics_by_uuid( 189 GATT_REPORT_MAP_CHARACTERISTIC 190 ) 191 if not report_map_characteristics: 192 print(color('!!! No Report Map characteristic', 'red')) 193 return 194 report_map_characteristic = report_map_characteristics[0] 195 196 control_point_characteristics = peer.get_characteristics_by_uuid( 197 GATT_HID_CONTROL_POINT_CHARACTERISTIC 198 ) 199 if not control_point_characteristics: 200 print(color('!!! No Control Point characteristic', 'red')) 201 return 202 # control_point_characteristic = control_point_characteristics[0] 203 204 report_characteristics = peer.get_characteristics_by_uuid( 205 GATT_REPORT_CHARACTERISTIC 206 ) 207 if not report_characteristics: 208 print(color('!!! No Report characteristic', 'red')) 209 return 210 for i, characteristic in enumerate(report_characteristics): 211 print(color('REPORT:', 'yellow'), characteristic) 212 if characteristic.properties & Characteristic.NOTIFY: 213 await peer.discover_descriptors(characteristic) 214 report_reference_descriptor = characteristic.get_descriptor( 215 GATT_REPORT_REFERENCE_DESCRIPTOR 216 ) 217 if report_reference_descriptor: 218 report_reference = await peer.read_value(report_reference_descriptor) 219 print(color(' Report Reference:', 'blue'), report_reference.hex()) 220 else: 221 report_reference = bytes([0, 0]) 222 await peer.subscribe( 223 characteristic, 224 lambda value, param=f'[{i}] {report_reference.hex()}': on_report( 225 param, value 226 ), 227 ) 228 229 protocol_mode = await peer.read_value(protocol_mode_characteristic) 230 print(f'Protocol Mode: {protocol_mode.hex()}') 231 hid_information = await peer.read_value(hid_information_characteristic) 232 print(f'HID Information: {hid_information.hex()}') 233 report_map = await peer.read_value(report_map_characteristic) 234 print(f'Report Map: {report_map.hex()}') 235 236 await asyncio.get_running_loop().create_future() 237 238 239# ----------------------------------------------------------------------------- 240async def keyboard_device(device, command): 241 # Create an 'input report' characteristic to send keyboard reports to the host 242 input_report_characteristic = Characteristic( 243 GATT_REPORT_CHARACTERISTIC, 244 Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY, 245 Characteristic.READABLE | Characteristic.WRITEABLE, 246 bytes([0, 0, 0, 0, 0, 0, 0, 0]), 247 [ 248 Descriptor( 249 GATT_REPORT_REFERENCE_DESCRIPTOR, 250 Descriptor.READABLE, 251 bytes([0x01, HID_INPUT_REPORT]), 252 ) 253 ], 254 ) 255 256 # Create an 'output report' characteristic to receive keyboard reports from the host 257 output_report_characteristic = Characteristic( 258 GATT_REPORT_CHARACTERISTIC, 259 Characteristic.READ 260 | Characteristic.WRITE 261 | Characteristic.WRITE_WITHOUT_RESPONSE, 262 Characteristic.READABLE | Characteristic.WRITEABLE, 263 bytes([0]), 264 [ 265 Descriptor( 266 GATT_REPORT_REFERENCE_DESCRIPTOR, 267 Descriptor.READABLE, 268 bytes([0x01, HID_OUTPUT_REPORT]), 269 ) 270 ], 271 ) 272 273 # Add the services to the GATT sever 274 device.add_services( 275 [ 276 Service( 277 GATT_DEVICE_INFORMATION_SERVICE, 278 [ 279 Characteristic( 280 GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC, 281 Characteristic.READ, 282 Characteristic.READABLE, 283 'Bumble', 284 ) 285 ], 286 ), 287 Service( 288 GATT_HUMAN_INTERFACE_DEVICE_SERVICE, 289 [ 290 Characteristic( 291 GATT_PROTOCOL_MODE_CHARACTERISTIC, 292 Characteristic.READ, 293 Characteristic.READABLE, 294 bytes([HID_REPORT_PROTOCOL]), 295 ), 296 Characteristic( 297 GATT_HID_INFORMATION_CHARACTERISTIC, 298 Characteristic.READ, 299 Characteristic.READABLE, 300 # bcdHID=1.1, bCountryCode=0x00, 301 # Flags=RemoteWake|NormallyConnectable 302 bytes([0x11, 0x01, 0x00, 0x03]), 303 ), 304 Characteristic( 305 GATT_HID_CONTROL_POINT_CHARACTERISTIC, 306 Characteristic.WRITE_WITHOUT_RESPONSE, 307 Characteristic.WRITEABLE, 308 CharacteristicValue(write=on_hid_control_point_write), 309 ), 310 Characteristic( 311 GATT_REPORT_MAP_CHARACTERISTIC, 312 Characteristic.READ, 313 Characteristic.READABLE, 314 HID_KEYBOARD_REPORT_MAP, 315 ), 316 input_report_characteristic, 317 output_report_characteristic, 318 ], 319 ), 320 Service( 321 GATT_BATTERY_SERVICE, 322 [ 323 Characteristic( 324 GATT_BATTERY_LEVEL_CHARACTERISTIC, 325 Characteristic.READ, 326 Characteristic.READABLE, 327 bytes([100]), 328 ) 329 ], 330 ), 331 ] 332 ) 333 334 # Debug print 335 for attribute in device.gatt_server.attributes: 336 print(attribute) 337 338 # Set the advertising data 339 device.advertising_data = bytes( 340 AdvertisingData( 341 [ 342 ( 343 AdvertisingData.COMPLETE_LOCAL_NAME, 344 bytes('Bumble Keyboard', 'utf-8'), 345 ), 346 ( 347 AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, 348 bytes(GATT_HUMAN_INTERFACE_DEVICE_SERVICE), 349 ), 350 (AdvertisingData.APPEARANCE, struct.pack('<H', 0x03C1)), 351 (AdvertisingData.FLAGS, bytes([0x05])), 352 ] 353 ) 354 ) 355 356 # Attach a listener 357 device.listener = ServerListener(device) 358 359 # Go! 360 await device.power_on() 361 await device.start_advertising(auto_restart=True) 362 363 if command == 'web': 364 # Start a Websocket server to receive events from a web page 365 async def serve(websocket, _path): 366 while True: 367 try: 368 message = await websocket.recv() 369 print('Received: ', str(message)) 370 371 parsed = json.loads(message) 372 message_type = parsed['type'] 373 if message_type == 'keydown': 374 # Only deal with keys a to z for now 375 key = parsed['key'] 376 if len(key) == 1: 377 code = ord(key) 378 if ord('a') <= code <= ord('z'): 379 hid_code = 0x04 + code - ord('a') 380 input_report_characteristic.value = bytes( 381 [0, 0, hid_code, 0, 0, 0, 0, 0] 382 ) 383 await device.notify_subscribers( 384 input_report_characteristic 385 ) 386 elif message_type == 'keyup': 387 input_report_characteristic.value = bytes.fromhex( 388 '0000000000000000' 389 ) 390 await device.notify_subscribers(input_report_characteristic) 391 392 except websockets.exceptions.ConnectionClosedOK: 393 pass 394 395 # pylint: disable-next=no-member 396 await websockets.serve(serve, 'localhost', 8989) 397 await asyncio.get_event_loop().create_future() 398 else: 399 message = bytes('hello', 'ascii') 400 while True: 401 for letter in message: 402 await asyncio.sleep(3.0) 403 404 # Keypress for the letter 405 keycode = 0x04 + letter - 0x61 406 input_report_characteristic.value = bytes( 407 [0, 0, keycode, 0, 0, 0, 0, 0] 408 ) 409 await device.notify_subscribers(input_report_characteristic) 410 411 # Key release 412 input_report_characteristic.value = bytes.fromhex('0000000000000000') 413 await device.notify_subscribers(input_report_characteristic) 414 415 416# ----------------------------------------------------------------------------- 417async def main(): 418 if len(sys.argv) < 4: 419 print( 420 'Usage: python keyboard.py <device-config> <transport-spec> <command>' 421 ' where <command> is one of:\n' 422 ' connect <address> (run a keyboard host, connecting to a keyboard)\n' 423 ' web (run a keyboard with keypress input from a web page, ' 424 'see keyboard.html\n' 425 ) 426 print( 427 ' sim (run a keyboard simulation, emitting a canned sequence of keystrokes' 428 ) 429 print('example: python keyboard.py keyboard.json usb:0 sim') 430 print( 431 'example: python keyboard.py keyboard.json usb:0 connect A0:A1:A2:A3:A4:A5' 432 ) 433 return 434 435 async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): 436 # Create a device to manage the host 437 device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) 438 439 command = sys.argv[3] 440 if command == 'connect': 441 # Run as a Keyboard host 442 await keyboard_host(device, sys.argv[4]) 443 elif command in ('sim', 'web'): 444 # Run as a keyboard device 445 await keyboard_device(device, command) 446 447 448# ----------------------------------------------------------------------------- 449logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper()) 450asyncio.run(main()) 451