1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19import sys 20import os 21import logging 22import struct 23import websockets 24import json 25from colors import color 26 27from bumble.core import AdvertisingData 28from bumble.device import Device, Connection, Peer 29from bumble.utils import AsyncRunner 30from bumble.transport import open_transport_or_link 31from bumble.gatt import ( 32 Descriptor, 33 Service, 34 Characteristic, 35 CharacteristicValue, 36 GATT_DEVICE_INFORMATION_SERVICE, 37 GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE, 38 GATT_DEVICE_BATTERY_SERVICE, 39 GATT_BATTERY_LEVEL_CHARACTERISTIC, 40 GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC, 41 GATT_REPORT_CHARACTERISTIC, 42 GATT_REPORT_MAP_CHARACTERISTIC, 43 GATT_PROTOCOL_MODE_CHARACTERISTIC, 44 GATT_HID_INFORMATION_CHARACTERISTIC, 45 GATT_HID_CONTROL_POINT_CHARACTERISTIC, 46 GATT_REPORT_REFERENCE_DESCRIPTOR 47) 48 49# ----------------------------------------------------------------------------- 50 51# Protocol Modes 52HID_BOOT_PROTOCOL = 0x00 53HID_REPORT_PROTOCOL = 0x01 54 55# Report Types 56HID_INPUT_REPORT = 0x01 57HID_OUTPUT_REPORT = 0x02 58HID_FEATURE_REPORT = 0x03 59 60# Report Map 61HID_KEYBOARD_REPORT_MAP = bytes([ 62 0x05, 0x01, # Usage Page (Generic Desktop Ctrls) 63 0x09, 0x06, # Usage (Keyboard) 64 0xA1, 0x01, # Collection (Application) 65 0x85, 0x01, # . Report ID (1) 66 0x05, 0x07, # . Usage Page (Kbrd/Keypad) 67 0x19, 0xE0, # . Usage Minimum (0xE0) 68 0x29, 0xE7, # . Usage Maximum (0xE7) 69 0x15, 0x00, # . Logical Minimum (0) 70 0x25, 0x01, # . Logical Maximum (1) 71 0x75, 0x01, # . Report Size (1) 72 0x95, 0x08, # . Report Count (8) 73 0x81, 0x02, # . Input (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position) 74 0x95, 0x01, # . Report Count (1) 75 0x75, 0x08, # . Report Size (8) 76 0x81, 0x01, # . Input (Const,Array,Abs,No Wrap,Linear,Preferred State,No Null Position) 77 0x95, 0x06, # . Report Count (6) 78 0x75, 0x08, # . Report Size (8) 79 0x15, 0x00, # . Logical Minimum (0x00) 80 0x25, 0x94, # . Logical Maximum (0x94) 81 0x05, 0x07, # . Usage Page (Kbrd/Keypad) 82 0x19, 0x00, # . Usage Minimum (0x00) 83 0x29, 0x94, # . Usage Maximum (0x94) 84 0x81, 0x00, # . Input (Data,Array,Abs,No Wrap,Linear,Preferred State,No Null Position) 85 0x95, 0x05, # . Report Count (5) 86 0x75, 0x01, # . Report Size (1) 87 0x05, 0x08, # . Usage Page (LEDs) 88 0x19, 0x01, # . Usage Minimum (Num Lock) 89 0x29, 0x05, # . Usage Maximum (Kana) 90 0x91, 0x02, # . Output (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile) 91 0x95, 0x01, # . Report Count (1) 92 0x75, 0x03, # . Report Size (3) 93 0x91, 0x01, # . Output (Const,Array,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile) 94 0xC0 # End Collection 95]) 96 97 98# ----------------------------------------------------------------------------- 99class ServerListener(Device.Listener, Connection.Listener): 100 def __init__(self, device): 101 self.device = device 102 103 @AsyncRunner.run_in_task() 104 async def on_connection(self, connection): 105 print(f'=== Connected to {connection}') 106 connection.listener = self 107 108 @AsyncRunner.run_in_task() 109 async def on_disconnection(self, reason): 110 print(f'### Disconnected, reason={reason}') 111 112 113# ----------------------------------------------------------------------------- 114def on_hid_control_point_write(connection, value): 115 print(f'Control Point Write: {value}') 116 117 118# ----------------------------------------------------------------------------- 119def on_report(characteristic, value): 120 print(color('Report:', 'cyan'), value.hex(), 'from', characteristic) 121 122 123# ----------------------------------------------------------------------------- 124async def keyboard_host(device, peer_address): 125 await device.power_on() 126 connection = await device.connect(peer_address) 127 await connection.pair() 128 peer = Peer(connection) 129 await peer.discover_service(GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE) 130 hid_services = peer.get_services_by_uuid(GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE) 131 if not hid_services: 132 print(color('!!! No HID service', 'red')) 133 return 134 await peer.discover_characteristics() 135 136 protocol_mode_characteristics = peer.get_characteristics_by_uuid(GATT_PROTOCOL_MODE_CHARACTERISTIC) 137 if not protocol_mode_characteristics: 138 print(color('!!! No Protocol Mode characteristic', 'red')) 139 return 140 protocol_mode_characteristic = protocol_mode_characteristics[0] 141 142 hid_information_characteristics = peer.get_characteristics_by_uuid(GATT_HID_INFORMATION_CHARACTERISTIC) 143 if not hid_information_characteristics: 144 print(color('!!! No HID Information characteristic', 'red')) 145 return 146 hid_information_characteristic = hid_information_characteristics[0] 147 148 report_map_characteristics = peer.get_characteristics_by_uuid(GATT_REPORT_MAP_CHARACTERISTIC) 149 if not report_map_characteristics: 150 print(color('!!! No Report Map characteristic', 'red')) 151 return 152 report_map_characteristic = report_map_characteristics[0] 153 154 control_point_characteristics = peer.get_characteristics_by_uuid(GATT_HID_CONTROL_POINT_CHARACTERISTIC) 155 if not control_point_characteristics: 156 print(color('!!! No Control Point characteristic', 'red')) 157 return 158 # control_point_characteristic = control_point_characteristics[0] 159 160 report_characteristics = peer.get_characteristics_by_uuid(GATT_REPORT_CHARACTERISTIC) 161 if not report_characteristics: 162 print(color('!!! No Report characteristic', 'red')) 163 return 164 for i, characteristic in enumerate(report_characteristics): 165 print(color('REPORT:', 'yellow'), characteristic) 166 if characteristic.properties & Characteristic.NOTIFY: 167 await peer.discover_descriptors(characteristic) 168 report_reference_descriptor = characteristic.get_descriptor(GATT_REPORT_REFERENCE_DESCRIPTOR) 169 if report_reference_descriptor: 170 report_reference = await peer.read_value(report_reference_descriptor) 171 print(color(' Report Reference:', 'blue'), report_reference.hex()) 172 else: 173 report_reference = bytes([0, 0]) 174 await peer.subscribe(characteristic, lambda value, param=f'[{i}] {report_reference.hex()}': on_report(param, value)) 175 176 protocol_mode = await peer.read_value(protocol_mode_characteristic) 177 print(f'Protocol Mode: {protocol_mode.hex()}') 178 hid_information = await peer.read_value(hid_information_characteristic) 179 print(f'HID Information: {hid_information.hex()}') 180 report_map = await peer.read_value(report_map_characteristic) 181 print(f'Report Map: {report_map.hex()}') 182 183 await asyncio.get_running_loop().create_future() 184 185 186# ----------------------------------------------------------------------------- 187async def keyboard_device(device, command): 188 # Create an 'input report' characteristic to send keyboard reports to the host 189 input_report_characteristic = Characteristic( 190 GATT_REPORT_CHARACTERISTIC, 191 Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY, 192 Characteristic.READABLE | Characteristic.WRITEABLE, 193 bytes([0, 0, 0, 0, 0, 0, 0, 0]), 194 [ 195 Descriptor(GATT_REPORT_REFERENCE_DESCRIPTOR, Descriptor.READABLE, bytes([0x01, HID_INPUT_REPORT])) 196 ] 197 ) 198 199 # Create an 'output report' characteristic to receive keyboard reports from the host 200 output_report_characteristic = Characteristic( 201 GATT_REPORT_CHARACTERISTIC, 202 Characteristic.READ | Characteristic.WRITE | Characteristic.WRITE_WITHOUT_RESPONSE, 203 Characteristic.READABLE | Characteristic.WRITEABLE, 204 bytes([0]), 205 [ 206 Descriptor(GATT_REPORT_REFERENCE_DESCRIPTOR, Descriptor.READABLE, bytes([0x01, HID_OUTPUT_REPORT])) 207 ] 208 ) 209 210 # Add the services to the GATT sever 211 device.add_services([ 212 Service( 213 GATT_DEVICE_INFORMATION_SERVICE, 214 [ 215 Characteristic( 216 GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC, 217 Characteristic.READ, 218 Characteristic.READABLE, 219 'Bumble' 220 ) 221 ] 222 ), 223 Service( 224 GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE, 225 [ 226 Characteristic( 227 GATT_PROTOCOL_MODE_CHARACTERISTIC, 228 Characteristic.READ, 229 Characteristic.READABLE, 230 bytes([HID_REPORT_PROTOCOL]) 231 ), 232 Characteristic( 233 GATT_HID_INFORMATION_CHARACTERISTIC, 234 Characteristic.READ, 235 Characteristic.READABLE, 236 bytes([0x11, 0x01, 0x00, 0x03]) # bcdHID=1.1, bCountryCode=0x00, Flags=RemoteWake|NormallyConnectable 237 ), 238 Characteristic( 239 GATT_HID_CONTROL_POINT_CHARACTERISTIC, 240 Characteristic.WRITE_WITHOUT_RESPONSE, 241 Characteristic.WRITEABLE, 242 CharacteristicValue(write=on_hid_control_point_write) 243 ), 244 Characteristic( 245 GATT_REPORT_MAP_CHARACTERISTIC, 246 Characteristic.READ, 247 Characteristic.READABLE, 248 HID_KEYBOARD_REPORT_MAP 249 ), 250 input_report_characteristic, 251 output_report_characteristic 252 ] 253 ), 254 Service( 255 GATT_DEVICE_BATTERY_SERVICE, 256 [ 257 Characteristic( 258 GATT_BATTERY_LEVEL_CHARACTERISTIC, 259 Characteristic.READ, 260 Characteristic.READABLE, 261 bytes([100]) 262 ) 263 ] 264 ) 265 ]) 266 267 # Debug print 268 for attribute in device.gatt_server.attributes: 269 print(attribute) 270 271 # Set the advertising data 272 device.advertising_data = bytes( 273 AdvertisingData([ 274 (AdvertisingData.COMPLETE_LOCAL_NAME, bytes('Bumble Keyboard', 'utf-8')), 275 (AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, 276 bytes(GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE)), 277 (AdvertisingData.APPEARANCE, struct.pack('<H', 0x03C1)), 278 (AdvertisingData.FLAGS, bytes([0x05])) 279 ]) 280 ) 281 282 # Attach a listener 283 device.listener = ServerListener(device) 284 285 # Go! 286 await device.power_on() 287 await device.start_advertising(auto_restart=True) 288 289 if command == 'web': 290 # Start a Websocket server to receive events from a web page 291 async def serve(websocket, path): 292 while True: 293 try: 294 message = await websocket.recv() 295 print('Received: ', str(message)) 296 297 parsed = json.loads(message) 298 message_type = parsed['type'] 299 if message_type == 'keydown': 300 # Only deal with keys a to z for now 301 key = parsed['key'] 302 if len(key) == 1: 303 code = ord(key) 304 if code >= ord('a') and code <= ord('z'): 305 hid_code = 0x04 + code - ord('a') 306 input_report_characteristic.value = bytes([0, 0, hid_code, 0, 0, 0, 0, 0]) 307 await device.notify_subscribers(input_report_characteristic) 308 elif message_type == 'keyup': 309 input_report_characteristic.value = bytes.fromhex('0000000000000000') 310 await device.notify_subscribers(input_report_characteristic) 311 312 except websockets.exceptions.ConnectionClosedOK: 313 pass 314 await websockets.serve(serve, 'localhost', 8989) 315 await asyncio.get_event_loop().create_future() 316 else: 317 message = bytes('hello', 'ascii') 318 while True: 319 for letter in message: 320 await asyncio.sleep(3.0) 321 322 # Keypress for the letter 323 keycode = 0x04 + letter - 0x61 324 input_report_characteristic.value = bytes([0, 0, keycode, 0, 0, 0, 0, 0]) 325 await device.notify_subscribers(input_report_characteristic) 326 327 # Key release 328 input_report_characteristic.value = bytes.fromhex('0000000000000000') 329 await device.notify_subscribers(input_report_characteristic) 330 331 332# ----------------------------------------------------------------------------- 333async def main(): 334 if len(sys.argv) < 4: 335 print('Usage: python keyboard.py <device-config> <transport-spec> <command>') 336 print(' where <command> is one of:') 337 print(' connect <address> (run a keyboard host, connecting to a keyboard)') 338 print(' web (run a keyboard with keypress input from a web page, see keyboard.html') 339 print(' sim (run a keyboard simulation, emitting a canned sequence of keystrokes') 340 print('example: python keyboard.py keyboard.json usb:0 sim') 341 print('example: python keyboard.py keyboard.json usb:0 connect A0:A1:A2:A3:A4:A5') 342 return 343 344 async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): 345 # Create a device to manage the host 346 device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) 347 348 command = sys.argv[3] 349 if command == 'connect': 350 # Run as a Keyboard host 351 await keyboard_host(device, sys.argv[4]) 352 elif command in {'sim', 'web'}: 353 # Run as a keyboard device 354 await keyboard_device(device, command) 355 356 357# ----------------------------------------------------------------------------- 358logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper()) 359asyncio.run(main()) 360