• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""gRPC mock target for testing purposes.
2
3Example MH testbed config for Hostside:
4- name: GrpcBtTargetStub-1
5  devices:
6  - type: MiscTestbedSubDevice
7    dimensions:
8      mobly_type: DerivedBtDevice
9    properties:
10      ModuleName: grpc_bt_target_mock
11      ClassName: GrpcBtTargetMock
12      Params:
13        config:
14          mac_address: FE:ED:BE:EF:CA:FE
15  dimensions:
16    device: GrpcBtTargetStub
17"""
18
19import subprocess
20from typing import Dict
21
22from absl import flags
23from absl import logging
24import grpc
25
26# Internal import
27from blueberry.grpc.proto import blueberry_device_controller_pb2
28from blueberry.grpc.proto import blueberry_device_controller_pb2_grpc
29
30
31FLAGS = flags.FLAGS
32
33
34class GrpcBtTargetMock(object):
35  """BT Mock Target for testing the GRPC interface."""
36
37  def __init__(self, config: Dict[str, str]) -> None:
38    """Initialize GRPC object."""
39    super(GrpcBtTargetMock, self).__init__()
40    self.config = config
41    self.mac_address = config['mac_address']
42
43  def __del__(self) -> None:
44    # pytype: disable=attribute-error
45    self.server_proc.terminate()
46    del self.channel_creds
47    del self.channel
48    del self.stub
49
50  def setup(self) -> None:
51    """Setup the gRPC server that the target mock will respond to."""
52    # pytype: disable=attribute-error
53    server_path = self.get_user_params()['mh_files']['grpc_server'][0]
54    logging.info('Start gRPC server: %s', server_path)
55    self.server_proc = subprocess.Popen([server_path],
56                                        stdin=subprocess.PIPE,
57                                        stdout=subprocess.PIPE,
58                                        stderr=subprocess.PIPE,
59                                        universal_newlines=True,
60                                        bufsize=0)
61
62    self.channel_creds = loas2.loas2_channel_credentials()
63    self.channel = grpc.secure_channel(FLAGS.server, self.channel_creds)
64    grpc.channel_ready_future(self.channel).result()
65    self.stub = blueberry_device_controller_pb2_grpc.BlueberryDeviceControllerStub(
66        self.channel)
67
68  def activate_pairing_mode(self) -> int:
69    """Activates Bluetooth pairing mode."""
70    logging.info('activate pairing mode TO BE IMPLEMENTED')
71    request = blueberry_device_controller_pb2.DiscoverableMode(mode=True)
72    try:
73      # pytype: disable=attribute-error
74      response = self.stub.SetDiscoverableMode(request)
75      logging.info('set discoverageble response: %s', response)
76      return 0
77    except grpc.RpcError as rpc_error:
78      print(rpc_error)
79      return -1
80
81  def factory_reset_bluetooth(self) -> None:
82    logging.info('factory reset TO BE IMPLEMENTED')
83
84  def get_bluetooth_mac_address(self) -> str:
85    logging.info('mac_address: %s', self.mac_address)
86    return self.mac_address
87