• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2023 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import argparse
16import asyncio
17import json
18import logging
19from typing import Any, Dict
20
21from bumble import pandora as bumble_server
22from bumble.pandora import Config, PandoraDevice, serve
23from bumble.rfcomm import Server as RFCOMMServer
24from bumble_experimental.asha import AshaService
25from bumble_experimental.avrcp import AvrcpService
26from bumble_experimental.bumble_config import BumbleConfigService
27from bumble_experimental.dck import DckService
28from bumble_experimental.gatt import GATTService
29from bumble_experimental.hid import HIDService
30from bumble_experimental.oob import OOBService
31from bumble_experimental.opp import OppService
32from bumble_experimental.rfcomm import RFCOMMService
33from pandora_experimental.asha_grpc_aio import add_AshaServicer_to_server
34from pandora_experimental.avrcp_grpc_aio import add_AVRCPServicer_to_server
35from pandora_experimental.bumble_config_grpc_aio import \
36    add_BumbleConfigServicer_to_server
37from pandora_experimental.dck_grpc_aio import add_DckServicer_to_server
38from pandora_experimental.gatt_grpc_aio import add_GATTServicer_to_server
39from pandora_experimental.hid_grpc_aio import add_HIDServicer_to_server
40from pandora_experimental.oob_grpc_aio import add_OOBServicer_to_server
41from pandora_experimental.opp_grpc_aio import add_OppServicer_to_server
42from pandora_experimental.rfcomm_grpc_aio import add_RFCOMMServicer_to_server
43
44BUMBLE_SERVER_GRPC_PORT = 7999
45ROOTCANAL_PORT_CUTTLEFISH = 7300
46
47
48def main(grpc_port: int, rootcanal_port: int, transport: str, config: str) -> None:
49    register_experimental_services()
50    if '<rootcanal-port>' in transport:
51        transport = transport.replace('<rootcanal-port>', str(rootcanal_port))
52
53    bumble_config = retrieve_config(config)
54    bumble_config.setdefault('transport', transport)
55    device = PandoraDevice(bumble_config)
56
57    server_config = Config()
58    server_config.load_from_dict(bumble_config.get('server', {}))
59
60    logging.basicConfig(level=logging.DEBUG,
61                        format='%(asctime)s.%(msecs).03d %(levelname)-8s %(message)s',
62                        datefmt='%m-%d %H:%M:%S')
63    asyncio.run(serve(device, config=server_config, port=grpc_port))
64
65
66def args_parser() -> argparse.ArgumentParser:
67    parser = argparse.ArgumentParser(description="Bumble command-line tool")
68
69    parser.add_argument('--grpc-port', type=int, default=BUMBLE_SERVER_GRPC_PORT, help='gRPC port to serve')
70    parser.add_argument('--rootcanal-port', type=int, default=ROOTCANAL_PORT_CUTTLEFISH, help='Rootcanal TCP port')
71    parser.add_argument('--transport',
72                        type=str,
73                        default='tcp-client:127.0.0.1:<rootcanal-port>',
74                        help='HCI transport (default: tcp-client:127.0.0.1:<rootcanal-port>)')
75    parser.add_argument('--config', type=str, help='Bumble json configuration file')
76
77    return parser
78
79
80def register_rfcomm_dependent_servicers(bumble, _, server) -> None:
81    rfcomm_server = RFCOMMServer(bumble.device)
82    add_RFCOMMServicer_to_server(RFCOMMService(bumble.device, rfcomm_server), server)
83    add_OppServicer_to_server(OppService(bumble.device, rfcomm_server), server)
84
85
86def register_experimental_services() -> None:
87    bumble_server.register_servicer_hook(
88        lambda bumble, _, server: add_AVRCPServicer_to_server(AvrcpService(bumble.device), server))
89    bumble_server.register_servicer_hook(
90        lambda bumble, _, server: add_AshaServicer_to_server(AshaService(bumble.device), server))
91    bumble_server.register_servicer_hook(
92        lambda bumble, _, server: add_DckServicer_to_server(DckService(bumble.device), server))
93    bumble_server.register_servicer_hook(
94        lambda bumble, _, server: add_GATTServicer_to_server(GATTService(bumble.device), server))
95    bumble_server.register_servicer_hook(register_rfcomm_dependent_servicers)
96    bumble_server.register_servicer_hook(
97        lambda bumble, _, server: add_HIDServicer_to_server(HIDService(bumble.device), server))
98    bumble_server.register_servicer_hook(
99        lambda bumble, _, server: add_OOBServicer_to_server(OOBService(bumble.device), server))
100    bumble_server.register_servicer_hook(lambda bumble, config, server: add_BumbleConfigServicer_to_server(
101        BumbleConfigService(bumble.device, config), server))
102
103
104def retrieve_config(config: str) -> Dict[str, Any]:
105    if not config:
106        return {}
107
108    with open(config, 'r') as f:
109        return json.load(f)
110
111
112if __name__ == '__main__':
113    args = args_parser().parse_args()
114    main(**vars(args))
115