• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1from __future__ import annotations
2import grpc
3import grpc.aio
4import logging
5
6from pandora_experimental.oob_grpc_aio import OOBServicer
7from pandora_experimental.oob_pb2 import (
8    OobDataRequest,
9    OobDataResponse,
10)
11
12from bumble.smp import OobContext, OobSharedData
13from bumble.pairing import PairingConfig, PairingDelegate
14from bumble.device import Device
15from bumble.pandora import utils
16
17
18# This class implements the Hid Pandora interface.
19class OOBService(OOBServicer):
20
21    def __init__(self, device: Device) -> None:
22        super().__init__()
23        self.log = utils.BumbleServerLoggerAdapter(logging.getLogger(), {'service_name': 'oob', 'device': device})
24        self.device = device
25
26    def configure_oob_pairing(self, peer_oob: OobSharedData) -> str:
27        our_oob_context = OobContext()
28        share_oob = our_oob_context.share().__str__()
29        self.log.debug(f"Local oob data: {share_oob}")
30        oob_contexts = PairingConfig.OobConfig(our_context=our_oob_context, peer_data=peer_oob, legacy_context=None)
31        self.device.pairing_config_factory = lambda connection: PairingConfig(
32            sc=True,
33            mitm=True,
34            bonding=True,
35            oob=oob_contexts,
36        )
37
38        return share_oob
39
40    @utils.rpc
41    async def ShareOobData(self, request: OobDataRequest, context: grpc.ServicerContext) -> OobDataResponse:
42
43        if request.oob:
44            data = str(bytes(request.oob).hex())
45            oob_c, oob_r = data[:len(data) // 2], data[len(data) // 2:]
46            peer_oob = OobSharedData(c=bytearray.fromhex(oob_c), r=bytearray.fromhex(oob_r))
47            self.log.debug(f'peer oob data {peer_oob}')
48        else:
49            peer_oob = None
50        share_oob = self.configure_oob_pairing(peer_oob)
51        # Extract data from string `OOB(C=XXXXXXXXXXXXXXXX, R=YYYYYYYYYYYYYYYY)`
52        extracted_oob = share_oob.strip("OOB()C=").replace(", R=", "")
53        return OobDataResponse(oob=bytes(bytearray.fromhex(extracted_oob)))
54