1# Copyright 2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import contextlib 16import functools 17import grpc 18import inspect 19import logging 20 21from bumble.device import Device 22from bumble.hci import Address 23from google.protobuf.message import Message # pytype: disable=pyi-error 24from typing import Any, Generator, MutableMapping, Optional, Tuple 25 26ADDRESS_TYPES: dict[str, int] = { 27 "public": Address.PUBLIC_DEVICE_ADDRESS, 28 "random": Address.RANDOM_DEVICE_ADDRESS, 29 "public_identity": Address.PUBLIC_IDENTITY_ADDRESS, 30 "random_static_identity": Address.RANDOM_IDENTITY_ADDRESS, 31} 32 33 34def address_from_request(request: Message, field: Optional[str]) -> Address: 35 if field is None: 36 return Address.ANY 37 return Address(bytes(reversed(getattr(request, field))), ADDRESS_TYPES[field]) 38 39 40class BumbleServerLoggerAdapter(logging.LoggerAdapter): # type: ignore 41 """Formats logs from the PandoraClient.""" 42 43 def process(self, msg: str, kwargs: MutableMapping[str, Any]) -> Tuple[str, MutableMapping[str, Any]]: 44 assert self.extra 45 service_name = self.extra['service_name'] 46 assert isinstance(service_name, str) 47 device = self.extra['device'] 48 assert isinstance(device, Device) 49 addr_bytes = bytes(reversed(bytes(device.public_address))) # pytype: disable=attribute-error 50 addr = ':'.join([f'{x:02X}' for x in addr_bytes[4:]]) 51 return (f'[bumble.{service_name}:{addr}] {msg}', kwargs) 52 53 54@contextlib.contextmanager 55def exception_to_rpc_error(context: grpc.ServicerContext) -> Generator[None, None, None]: 56 try: 57 yield None 58 except NotImplementedError as e: 59 context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore 60 context.set_details(str(e)) # type: ignore 61 except ValueError as e: 62 context.set_code(grpc.StatusCode.INVALID_ARGUMENT) # type: ignore 63 context.set_details(str(e)) # type: ignore 64 except RuntimeError as e: 65 context.set_code(grpc.StatusCode.ABORTED) # type: ignore 66 context.set_details(str(e)) # type: ignore 67 68 69# Decorate an RPC servicer method with a wrapper that transform exceptions to gRPC errors. 70def rpc(func: Any) -> Any: 71 @functools.wraps(func) 72 async def asyncgen_wrapper(self: Any, request: Any, context: grpc.ServicerContext) -> Any: 73 with exception_to_rpc_error(context): 74 async for v in func(self, request, context): 75 yield v 76 77 @functools.wraps(func) 78 async def async_wrapper(self: Any, request: Any, context: grpc.ServicerContext) -> Any: 79 with exception_to_rpc_error(context): 80 return await func(self, request, context) 81 82 @functools.wraps(func) 83 def gen_wrapper(self: Any, request: Any, context: grpc.ServicerContext) -> Any: 84 with exception_to_rpc_error(context): 85 for v in func(self, request, context): 86 yield v 87 88 @functools.wraps(func) 89 def wrapper(self: Any, request: Any, context: grpc.ServicerContext) -> Any: 90 with exception_to_rpc_error(context): 91 return func(self, request, context) 92 93 if inspect.isasyncgenfunction(func): 94 return asyncgen_wrapper 95 96 if inspect.iscoroutinefunction(func): 97 return async_wrapper 98 99 if inspect.isgenerator(func): 100 return gen_wrapper 101 102 return wrapper 103