1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19import sys 20import os 21import logging 22 23from bumble.device import Device, Connection 24from bumble.transport import open_transport_or_link 25from bumble.att import ( 26 ATT_Error, 27 ATT_INSUFFICIENT_ENCRYPTION_ERROR 28) 29from bumble.gatt import ( 30 Service, 31 Characteristic, 32 CharacteristicValue, 33 Descriptor, 34 GATT_CHARACTERISTIC_USER_DESCRIPTION_DESCRIPTOR, 35 GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC, 36 GATT_DEVICE_INFORMATION_SERVICE 37) 38 39 40# ----------------------------------------------------------------------------- 41class Listener(Device.Listener, Connection.Listener): 42 def __init__(self, device): 43 self.device = device 44 45 def on_connection(self, connection): 46 print(f'=== Connected to {connection}') 47 connection.listener = self 48 49 def on_disconnection(self, reason): 50 print(f'### Disconnected, reason={reason}') 51 52 53def my_custom_read(connection): 54 print('----- READ from', connection) 55 return bytes(f'Hello {connection}', 'ascii') 56 57 58def my_custom_write(connection, value): 59 print(f'----- WRITE from {connection}: {value}') 60 61 62def my_custom_read_with_error(connection): 63 print('----- READ from', connection, '[returning error]') 64 if connection.is_encrypted: 65 return bytes([123]) 66 else: 67 raise ATT_Error(ATT_INSUFFICIENT_ENCRYPTION_ERROR) 68 69 70def my_custom_write_with_error(connection, value): 71 print(f'----- WRITE from {connection}: {value}', '[returning error]') 72 if not connection.is_encrypted: 73 raise ATT_Error(ATT_INSUFFICIENT_ENCRYPTION_ERROR) 74 75 76# ----------------------------------------------------------------------------- 77async def main(): 78 if len(sys.argv) < 3: 79 print('Usage: run_gatt_server.py <device-config> <transport-spec> [<bluetooth-address>]') 80 print('example: run_gatt_server.py device1.json usb:0 E1:CA:72:48:C4:E8') 81 return 82 83 print('<<< connecting to HCI...') 84 async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): 85 print('<<< connected') 86 87 # Create a device to manage the host 88 device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) 89 device.listener = Listener(device) 90 91 # Add a few entries to the device's GATT server 92 descriptor = Descriptor(GATT_CHARACTERISTIC_USER_DESCRIPTION_DESCRIPTOR, Descriptor.READABLE, 'My Description') 93 manufacturer_name_characteristic = Characteristic( 94 GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC, 95 Characteristic.READ, 96 Characteristic.READABLE, 97 'Fitbit', 98 [descriptor] 99 ) 100 device_info_service = Service(GATT_DEVICE_INFORMATION_SERVICE, [ 101 manufacturer_name_characteristic 102 ]) 103 custom_service1 = Service( 104 '50DB505C-8AC4-4738-8448-3B1D9CC09CC5', 105 [ 106 Characteristic( 107 'D901B45B-4916-412E-ACCA-376ECB603B2C', 108 Characteristic.READ | Characteristic.WRITE, 109 Characteristic.READABLE | Characteristic.WRITEABLE, 110 CharacteristicValue(read=my_custom_read, write=my_custom_write) 111 ), 112 Characteristic( 113 '552957FB-CF1F-4A31-9535-E78847E1A714', 114 Characteristic.READ | Characteristic.WRITE, 115 Characteristic.READABLE | Characteristic.WRITEABLE, 116 CharacteristicValue(read=my_custom_read_with_error, write=my_custom_write_with_error) 117 ), 118 Characteristic( 119 '486F64C6-4B5F-4B3B-8AFF-EDE134A8446A', 120 Characteristic.READ | Characteristic.NOTIFY, 121 Characteristic.READABLE, 122 'hello' 123 ) 124 ] 125 ) 126 device.add_services([device_info_service, custom_service1]) 127 128 # Debug print 129 for attribute in device.gatt_server.attributes: 130 print(attribute) 131 132 # Get things going 133 await device.power_on() 134 135 # Connect to a peer 136 if len(sys.argv) > 3: 137 target_address = sys.argv[3] 138 print(f'=== Connecting to {target_address}...') 139 await device.connect(target_address) 140 else: 141 await device.start_advertising(auto_restart=True) 142 143 await hci_source.wait_for_termination() 144 145# ----------------------------------------------------------------------------- 146logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper()) 147asyncio.run(main()) 148