1from __future__ import annotations 2import grpc 3import grpc.aio 4import logging 5 6from pandora_experimental.oob_grpc_aio import OOBServicer 7from pandora_experimental.oob_pb2 import ( 8 OobDataRequest, 9 OobDataResponse, 10) 11 12from bumble.smp import OobContext, OobSharedData 13from bumble.pairing import PairingConfig, PairingDelegate 14from bumble.device import Device 15from bumble.pandora import utils 16 17 18# This class implements the Hid Pandora interface. 19class OOBService(OOBServicer): 20 21 def __init__(self, device: Device) -> None: 22 super().__init__() 23 self.log = utils.BumbleServerLoggerAdapter(logging.getLogger(), {'service_name': 'oob', 'device': device}) 24 self.device = device 25 26 def configure_oob_pairing(self, peer_oob: OobSharedData) -> str: 27 our_oob_context = OobContext() 28 share_oob = our_oob_context.share().__str__() 29 self.log.debug(f"Local oob data: {share_oob}") 30 oob_contexts = PairingConfig.OobConfig(our_context=our_oob_context, peer_data=peer_oob, legacy_context=None) 31 self.device.pairing_config_factory = lambda connection: PairingConfig( 32 sc=True, 33 mitm=True, 34 bonding=True, 35 oob=oob_contexts, 36 ) 37 38 return share_oob 39 40 @utils.rpc 41 async def ShareOobData(self, request: OobDataRequest, context: grpc.ServicerContext) -> OobDataResponse: 42 43 if request.oob: 44 data = str(bytes(request.oob).hex()) 45 oob_c, oob_r = data[:len(data) // 2], data[len(data) // 2:] 46 peer_oob = OobSharedData(c=bytearray.fromhex(oob_c), r=bytearray.fromhex(oob_r)) 47 self.log.debug(f'peer oob data {peer_oob}') 48 else: 49 peer_oob = None 50 share_oob = self.configure_oob_pairing(peer_oob) 51 # Extract data from string `OOB(C=XXXXXXXXXXXXXXXX, R=YYYYYYYYYYYYYYYY)` 52 extracted_oob = share_oob.strip("OOB()C=").replace(", R=", "") 53 return OobDataResponse(oob=bytes(bytearray.fromhex(extracted_oob))) 54