1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19import sys 20import os 21import logging 22 23from bumble.device import Device, Connection 24from bumble.transport import open_transport_or_link 25from bumble.att import ATT_Error, ATT_INSUFFICIENT_ENCRYPTION_ERROR 26from bumble.gatt import ( 27 Service, 28 Characteristic, 29 CharacteristicValue, 30 Descriptor, 31 GATT_CHARACTERISTIC_USER_DESCRIPTION_DESCRIPTOR, 32 GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC, 33 GATT_DEVICE_INFORMATION_SERVICE, 34) 35 36 37# ----------------------------------------------------------------------------- 38class Listener(Device.Listener, Connection.Listener): 39 def __init__(self, device): 40 self.device = device 41 42 def on_connection(self, connection): 43 print(f'=== Connected to {connection}') 44 connection.listener = self 45 46 def on_disconnection(self, reason): 47 print(f'### Disconnected, reason={reason}') 48 49 50def my_custom_read(connection): 51 print('----- READ from', connection) 52 return bytes(f'Hello {connection}', 'ascii') 53 54 55def my_custom_write(connection, value): 56 print(f'----- WRITE from {connection}: {value}') 57 58 59def my_custom_read_with_error(connection): 60 print('----- READ from', connection, '[returning error]') 61 if connection.is_encrypted: 62 return bytes([123]) 63 64 raise ATT_Error(ATT_INSUFFICIENT_ENCRYPTION_ERROR) 65 66 67def my_custom_write_with_error(connection, value): 68 print(f'----- WRITE from {connection}: {value}', '[returning error]') 69 if not connection.is_encrypted: 70 raise ATT_Error(ATT_INSUFFICIENT_ENCRYPTION_ERROR) 71 72 73# ----------------------------------------------------------------------------- 74async def main(): 75 if len(sys.argv) < 3: 76 print( 77 'Usage: run_gatt_server.py <device-config> <transport-spec> ' 78 '[<bluetooth-address>]' 79 ) 80 print('example: run_gatt_server.py device1.json usb:0 E1:CA:72:48:C4:E8') 81 return 82 83 print('<<< connecting to HCI...') 84 async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): 85 print('<<< connected') 86 87 # Create a device to manage the host 88 device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) 89 device.listener = Listener(device) 90 91 # Add a few entries to the device's GATT server 92 descriptor = Descriptor( 93 GATT_CHARACTERISTIC_USER_DESCRIPTION_DESCRIPTOR, 94 Descriptor.READABLE, 95 'My Description', 96 ) 97 manufacturer_name_characteristic = Characteristic( 98 GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC, 99 Characteristic.READ, 100 Characteristic.READABLE, 101 'Fitbit', 102 [descriptor], 103 ) 104 device_info_service = Service( 105 GATT_DEVICE_INFORMATION_SERVICE, [manufacturer_name_characteristic] 106 ) 107 custom_service1 = Service( 108 '50DB505C-8AC4-4738-8448-3B1D9CC09CC5', 109 [ 110 Characteristic( 111 'D901B45B-4916-412E-ACCA-376ECB603B2C', 112 Characteristic.READ | Characteristic.WRITE, 113 Characteristic.READABLE | Characteristic.WRITEABLE, 114 CharacteristicValue(read=my_custom_read, write=my_custom_write), 115 ), 116 Characteristic( 117 '552957FB-CF1F-4A31-9535-E78847E1A714', 118 Characteristic.READ | Characteristic.WRITE, 119 Characteristic.READABLE | Characteristic.WRITEABLE, 120 CharacteristicValue( 121 read=my_custom_read_with_error, write=my_custom_write_with_error 122 ), 123 ), 124 Characteristic( 125 '486F64C6-4B5F-4B3B-8AFF-EDE134A8446A', 126 Characteristic.READ | Characteristic.NOTIFY, 127 Characteristic.READABLE, 128 'hello', 129 ), 130 ], 131 ) 132 device.add_services([device_info_service, custom_service1]) 133 134 # Debug print 135 for attribute in device.gatt_server.attributes: 136 print(attribute) 137 138 # Get things going 139 await device.power_on() 140 141 # Connect to a peer 142 if len(sys.argv) > 3: 143 target_address = sys.argv[3] 144 print(f'=== Connecting to {target_address}...') 145 await device.connect(target_address) 146 else: 147 await device.start_advertising(auto_restart=True) 148 149 await hci_source.wait_for_termination() 150 151 152# ----------------------------------------------------------------------------- 153logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper()) 154asyncio.run(main()) 155