• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2019 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""An example of multiprocessing concurrency with gRPC."""
15
16from __future__ import absolute_import
17from __future__ import division
18from __future__ import print_function
19
20import argparse
21import atexit
22import logging
23import multiprocessing
24import operator
25import sys
26
27import grpc
28
29import prime_pb2
30import prime_pb2_grpc
31
32_PROCESS_COUNT = 8
33_MAXIMUM_CANDIDATE = 10000
34
35# Each worker process initializes a single channel after forking.
36# It's regrettable, but to ensure that each subprocess only has to instantiate
37# a single channel to be reused across all RPCs, we use globals.
38_worker_channel_singleton = None
39_worker_stub_singleton = None
40
41_LOGGER = logging.getLogger(__name__)
42
43
44def _shutdown_worker():
45    _LOGGER.info('Shutting worker process down.')
46    if _worker_channel_singleton is not None:
47        _worker_channel_singleton.stop()
48
49
50def _initialize_worker(server_address):
51    global _worker_channel_singleton  # pylint: disable=global-statement
52    global _worker_stub_singleton  # pylint: disable=global-statement
53    _LOGGER.info('Initializing worker process.')
54    _worker_channel_singleton = grpc.insecure_channel(server_address)
55    _worker_stub_singleton = prime_pb2_grpc.PrimeCheckerStub(
56        _worker_channel_singleton)
57    atexit.register(_shutdown_worker)
58
59
60def _run_worker_query(primality_candidate):
61    _LOGGER.info('Checking primality of %s.', primality_candidate)
62    return _worker_stub_singleton.check(
63        prime_pb2.PrimeCandidate(candidate=primality_candidate))
64
65
66def _calculate_primes(server_address):
67    worker_pool = multiprocessing.Pool(processes=_PROCESS_COUNT,
68                                       initializer=_initialize_worker,
69                                       initargs=(server_address,))
70    check_range = range(2, _MAXIMUM_CANDIDATE)
71    primality = worker_pool.map(_run_worker_query, check_range)
72    primes = zip(check_range, map(operator.attrgetter('isPrime'), primality))
73    return tuple(primes)
74
75
76def main():
77    msg = 'Determine the primality of the first {} integers.'.format(
78        _MAXIMUM_CANDIDATE)
79    parser = argparse.ArgumentParser(description=msg)
80    parser.add_argument('server_address',
81                        help='The address of the server (e.g. localhost:50051)')
82    args = parser.parse_args()
83    primes = _calculate_primes(args.server_address)
84    print(primes)
85
86
87if __name__ == '__main__':
88    handler = logging.StreamHandler(sys.stdout)
89    formatter = logging.Formatter('[PID %(process)d] %(message)s')
90    handler.setFormatter(formatter)
91    _LOGGER.addHandler(handler)
92    _LOGGER.setLevel(logging.INFO)
93    main()
94