1# Copyright 2019 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""An example of multiprocess concurrency with gRPC.""" 15 16from __future__ import absolute_import 17from __future__ import division 18from __future__ import print_function 19 20from concurrent import futures 21import contextlib 22import datetime 23import logging 24import math 25import multiprocessing 26import time 27import socket 28import sys 29 30import grpc 31 32import prime_pb2 33import prime_pb2_grpc 34 35_LOGGER = logging.getLogger(__name__) 36 37_ONE_DAY = datetime.timedelta(days=1) 38_PROCESS_COUNT = multiprocessing.cpu_count() 39_THREAD_CONCURRENCY = _PROCESS_COUNT 40 41 42def is_prime(n): 43 for i in range(2, int(math.ceil(math.sqrt(n)))): 44 if n % i == 0: 45 return False 46 else: 47 return True 48 49 50class PrimeChecker(prime_pb2_grpc.PrimeCheckerServicer): 51 52 def check(self, request, context): 53 _LOGGER.info('Determining primality of %s', request.candidate) 54 return prime_pb2.Primality(isPrime=is_prime(request.candidate)) 55 56 57def _wait_forever(server): 58 try: 59 while True: 60 time.sleep(_ONE_DAY.total_seconds()) 61 except KeyboardInterrupt: 62 server.stop(None) 63 64 65def _run_server(bind_address): 66 """Start a server in a subprocess.""" 67 _LOGGER.info('Starting new server.') 68 options = (('grpc.so_reuseport', 1),) 69 70 server = grpc.server(futures.ThreadPoolExecutor( 71 max_workers=_THREAD_CONCURRENCY,), 72 options=options) 73 prime_pb2_grpc.add_PrimeCheckerServicer_to_server(PrimeChecker(), server) 74 server.add_insecure_port(bind_address) 75 server.start() 76 _wait_forever(server) 77 78 79@contextlib.contextmanager 80def _reserve_port(): 81 """Find and reserve a port for all subprocesses to use.""" 82 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 83 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 84 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 0: 85 raise RuntimeError("Failed to set SO_REUSEPORT.") 86 sock.bind(('', 0)) 87 try: 88 yield sock.getsockname()[1] 89 finally: 90 sock.close() 91 92 93def main(): 94 with _reserve_port() as port: 95 bind_address = 'localhost:{}'.format(port) 96 _LOGGER.info("Binding to '%s'", bind_address) 97 sys.stdout.flush() 98 workers = [] 99 for _ in range(_PROCESS_COUNT): 100 # NOTE: It is imperative that the worker subprocesses be forked before 101 # any gRPC servers start up. See 102 # https://github.com/grpc/grpc/issues/16001 for more details. 103 worker = multiprocessing.Process(target=_run_server, 104 args=(bind_address,)) 105 worker.start() 106 workers.append(worker) 107 for worker in workers: 108 worker.join() 109 110 111if __name__ == '__main__': 112 handler = logging.StreamHandler(sys.stdout) 113 formatter = logging.Formatter('[PID %(process)d] %(message)s') 114 handler.setFormatter(formatter) 115 _LOGGER.addHandler(handler) 116 _LOGGER.setLevel(logging.INFO) 117 main() 118