1"""A generic gRPC mock device controller. 2 3Example MH testbed config for Hostside: 4- name: GrpcBtSyncStub-1 5 devices: 6 - type: MiscTestbedSubDevice 7 dimensions: 8 mobly_type: DerivedBtDevice 9 properties: 10 ModuleName: grpc_bt_sync_mock 11 ClassName: GrpcBtSyncMock 12 Params: 13 config: 14 mac_address: FE:ED:BE:EF:CA:FE 15 dimensions: 16 device: GrpcBtSyncStub 17""" 18 19import subprocess 20from typing import Any, Dict, Optional, Tuple 21 22from absl import flags 23from absl import logging 24import grpc 25 26# Internal import 27from blueberry.grpc.proto import blueberry_device_controller_pb2 28from blueberry.grpc.proto import blueberry_device_controller_pb2_grpc 29 30 31FLAGS = flags.FLAGS 32flags.DEFINE_string('server', 'dns:///[::1]:10000', 'server address') 33 34 35class GrpcBtSyncMock(object): 36 """Generic GRPC device controller.""" 37 38 def __init__(self, config: Dict[str, str]) -> None: 39 """Initialize GRPC object.""" 40 super(GrpcBtSyncMock, self).__init__() 41 self.config = config 42 self.mac_address = config['mac_address'] 43 44 def __del__(self) -> None: 45 # pytype: disable=attribute-error 46 self.server_proc.terminate() 47 del self.channel_creds 48 del self.channel 49 del self.stub 50 51 def setup(self) -> None: 52 """Setup the gRPC server that the sync mock will respond to.""" 53 # pytype: disable=attribute-error 54 server_path = self.get_user_params()['mh_files']['grpc_server'][0] 55 logging.info('Start gRPC server: %s', server_path) 56 self.server_proc = subprocess.Popen([server_path], 57 stdin=subprocess.PIPE, 58 stdout=subprocess.PIPE, 59 stderr=subprocess.PIPE, 60 universal_newlines=True, 61 bufsize=0) 62 63 self.channel_creds = loas2.loas2_channel_credentials() 64 self.channel = grpc.secure_channel(FLAGS.server, self.channel_creds) 65 grpc.channel_ready_future(self.channel).result() 66 self.stub = blueberry_device_controller_pb2_grpc.BlueberryDeviceControllerStub( 67 self.channel) 68 69 def init_setup(self) -> None: 70 logging.info('init setup TO BE IMPLEMENTED') 71 72 def set_target(self, bt_device: Any) -> None: 73 self._target_device = bt_device 74 75 def pair_and_connect_bluetooth( 76 self, target_mac_address: str) -> Optional[Tuple[float, float]]: 77 """Pair and connect to a peripheral Bluetooth device.""" 78 request = blueberry_device_controller_pb2.TargetMacAddress( 79 mac_address=target_mac_address) 80 try: 81 # pytype: disable=attribute-error 82 response = self.stub.PairAndConnectBluetooth(request) 83 logging.info('pair and connect bluetooth response: %s', response) 84 if response.error: 85 print('error handler TO BE IMPLEMENTED') 86 else: 87 return response.pairing_time_sec, response.connection_time_sec 88 except grpc.RpcError as rpc_error: 89 print(rpc_error) 90