1"""gRPC mock target for testing purposes. 2 3Example MH testbed config for Hostside: 4- name: GrpcBtTargetStub-1 5 devices: 6 - type: MiscTestbedSubDevice 7 dimensions: 8 mobly_type: DerivedBtDevice 9 properties: 10 ModuleName: grpc_bt_target_mock 11 ClassName: GrpcBtTargetMock 12 Params: 13 config: 14 mac_address: FE:ED:BE:EF:CA:FE 15 dimensions: 16 device: GrpcBtTargetStub 17""" 18 19import subprocess 20from typing import Dict 21 22from absl import flags 23from absl import logging 24import grpc 25 26# Internal import 27from blueberry.grpc.proto import blueberry_device_controller_pb2 28from blueberry.grpc.proto import blueberry_device_controller_pb2_grpc 29 30 31FLAGS = flags.FLAGS 32 33 34class GrpcBtTargetMock(object): 35 """BT Mock Target for testing the GRPC interface.""" 36 37 def __init__(self, config: Dict[str, str]) -> None: 38 """Initialize GRPC object.""" 39 super(GrpcBtTargetMock, self).__init__() 40 self.config = config 41 self.mac_address = config['mac_address'] 42 43 def __del__(self) -> None: 44 # pytype: disable=attribute-error 45 self.server_proc.terminate() 46 del self.channel_creds 47 del self.channel 48 del self.stub 49 50 def setup(self) -> None: 51 """Setup the gRPC server that the target mock will respond to.""" 52 # pytype: disable=attribute-error 53 server_path = self.get_user_params()['mh_files']['grpc_server'][0] 54 logging.info('Start gRPC server: %s', server_path) 55 self.server_proc = subprocess.Popen([server_path], 56 stdin=subprocess.PIPE, 57 stdout=subprocess.PIPE, 58 stderr=subprocess.PIPE, 59 universal_newlines=True, 60 bufsize=0) 61 62 self.channel_creds = loas2.loas2_channel_credentials() 63 self.channel = grpc.secure_channel(FLAGS.server, self.channel_creds) 64 grpc.channel_ready_future(self.channel).result() 65 self.stub = blueberry_device_controller_pb2_grpc.BlueberryDeviceControllerStub( 66 self.channel) 67 68 def activate_pairing_mode(self) -> int: 69 """Activates Bluetooth pairing mode.""" 70 logging.info('activate pairing mode TO BE IMPLEMENTED') 71 request = blueberry_device_controller_pb2.DiscoverableMode(mode=True) 72 try: 73 # pytype: disable=attribute-error 74 response = self.stub.SetDiscoverableMode(request) 75 logging.info('set discoverageble response: %s', response) 76 return 0 77 except grpc.RpcError as rpc_error: 78 print(rpc_error) 79 return -1 80 81 def factory_reset_bluetooth(self) -> None: 82 logging.info('factory reset TO BE IMPLEMENTED') 83 84 def get_bluetooth_mac_address(self) -> str: 85 logging.info('mac_address: %s', self.mac_address) 86 return self.mac_address 87