• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2019 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""An example of multiprocess concurrency with gRPC."""
15
16from __future__ import absolute_import
17from __future__ import division
18from __future__ import print_function
19
20from concurrent import futures
21import contextlib
22import datetime
23import logging
24import math
25import multiprocessing
26import time
27import socket
28import sys
29
30import grpc
31
32import prime_pb2
33import prime_pb2_grpc
34
35_LOGGER = logging.getLogger(__name__)
36
37_ONE_DAY = datetime.timedelta(days=1)
38_PROCESS_COUNT = multiprocessing.cpu_count()
39_THREAD_CONCURRENCY = _PROCESS_COUNT
40
41
42def is_prime(n):
43    for i in range(2, int(math.ceil(math.sqrt(n)))):
44        if n % i == 0:
45            return False
46    else:
47        return True
48
49
50class PrimeChecker(prime_pb2_grpc.PrimeCheckerServicer):
51
52    def check(self, request, context):
53        _LOGGER.info('Determining primality of %s', request.candidate)
54        return prime_pb2.Primality(isPrime=is_prime(request.candidate))
55
56
57def _wait_forever(server):
58    try:
59        while True:
60            time.sleep(_ONE_DAY.total_seconds())
61    except KeyboardInterrupt:
62        server.stop(None)
63
64
65def _run_server(bind_address):
66    """Start a server in a subprocess."""
67    _LOGGER.info('Starting new server.')
68    options = (('grpc.so_reuseport', 1),)
69
70    server = grpc.server(futures.ThreadPoolExecutor(
71        max_workers=_THREAD_CONCURRENCY,),
72                         options=options)
73    prime_pb2_grpc.add_PrimeCheckerServicer_to_server(PrimeChecker(), server)
74    server.add_insecure_port(bind_address)
75    server.start()
76    _wait_forever(server)
77
78
79@contextlib.contextmanager
80def _reserve_port():
81    """Find and reserve a port for all subprocesses to use."""
82    sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
83    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
84    if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 0:
85        raise RuntimeError("Failed to set SO_REUSEPORT.")
86    sock.bind(('', 0))
87    try:
88        yield sock.getsockname()[1]
89    finally:
90        sock.close()
91
92
93def main():
94    with _reserve_port() as port:
95        bind_address = 'localhost:{}'.format(port)
96        _LOGGER.info("Binding to '%s'", bind_address)
97        sys.stdout.flush()
98        workers = []
99        for _ in range(_PROCESS_COUNT):
100            # NOTE: It is imperative that the worker subprocesses be forked before
101            # any gRPC servers start up. See
102            # https://github.com/grpc/grpc/issues/16001 for more details.
103            worker = multiprocessing.Process(target=_run_server,
104                                             args=(bind_address,))
105            worker.start()
106            workers.append(worker)
107        for worker in workers:
108            worker.join()
109
110
111if __name__ == '__main__':
112    handler = logging.StreamHandler(sys.stdout)
113    formatter = logging.Formatter('[PID %(process)d] %(message)s')
114    handler.setFormatter(formatter)
115    _LOGGER.addHandler(handler)
116    _LOGGER.setLevel(logging.INFO)
117    main()
118