• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import grpc
6
7from chromiumos.test.api import callbox_service_pb2 as cbp
8from chromiumos.test.api import callbox_service_pb2_grpc as cbs
9
10from concurrent import futures
11
12
13class CallBoxServer(cbs.CallboxServiceServicer):
14    """Implements the callbox_service.proto API"""
15
16    def CheckHealth(self, request, context):
17        """ Basic endpoint to check the service is up """
18        return cbp.CheckHealthResponse()
19
20
21def serve():
22    """Start/run the server with a single worker thread"""
23    server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
24    cbs.add_CallboxServiceServicer_to_server(CallBoxServer(), server)
25    server.add_insecure_port('[::]:50051')
26    server.start()
27    return server
28
29
30if __name__ == '__main__':
31    server = serve()
32    server.wait_for_termination()
33