1# Copyright 2019 the gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""An example of cancelling requests in gRPC.""" 15 16from __future__ import absolute_import 17from __future__ import division 18from __future__ import print_function 19 20from concurrent import futures 21import argparse 22import logging 23import threading 24 25import grpc 26import search 27 28from examples.python.cancellation import hash_name_pb2 29from examples.python.cancellation import hash_name_pb2_grpc 30 31_LOGGER = logging.getLogger(__name__) 32_SERVER_HOST = 'localhost' 33 34_DESCRIPTION = "A server for finding hashes similar to names." 35 36 37class HashFinder(hash_name_pb2_grpc.HashFinderServicer): 38 39 def __init__(self, maximum_hashes): 40 super(HashFinder, self).__init__() 41 self._maximum_hashes = maximum_hashes 42 43 def Find(self, request, context): 44 stop_event = threading.Event() 45 46 def on_rpc_done(): 47 _LOGGER.debug("Attempting to regain servicer thread.") 48 stop_event.set() 49 50 context.add_callback(on_rpc_done) 51 candidates = [] 52 try: 53 candidates = list( 54 search.search(request.desired_name, 55 request.ideal_hamming_distance, stop_event, 56 self._maximum_hashes)) 57 except search.ResourceLimitExceededError: 58 _LOGGER.info("Cancelling RPC due to exhausted resources.") 59 context.cancel() 60 _LOGGER.debug("Servicer thread returning.") 61 if not candidates: 62 return hash_name_pb2.HashNameResponse() 63 return candidates[-1] 64 65 def FindRange(self, request, context): 66 stop_event = threading.Event() 67 68 def on_rpc_done(): 69 _LOGGER.debug("Attempting to regain servicer thread.") 70 stop_event.set() 71 72 context.add_callback(on_rpc_done) 73 secret_generator = search.search( 74 request.desired_name, 75 request.ideal_hamming_distance, 76 stop_event, 77 self._maximum_hashes, 78 interesting_hamming_distance=request.interesting_hamming_distance) 79 try: 80 for candidate in secret_generator: 81 yield candidate 82 except search.ResourceLimitExceededError: 83 _LOGGER.info("Cancelling RPC due to exhausted resources.") 84 context.cancel() 85 _LOGGER.debug("Regained servicer thread.") 86 87 88def _running_server(port, maximum_hashes): 89 # We use only a single servicer thread here to demonstrate that, if managed 90 # carefully, cancelled RPCs can need not continue occupying servicers 91 # threads. 92 server = grpc.server(futures.ThreadPoolExecutor(max_workers=1), 93 maximum_concurrent_rpcs=1) 94 hash_name_pb2_grpc.add_HashFinderServicer_to_server( 95 HashFinder(maximum_hashes), server) 96 address = '{}:{}'.format(_SERVER_HOST, port) 97 actual_port = server.add_insecure_port(address) 98 server.start() 99 print("Server listening at '{}'".format(address)) 100 return server 101 102 103def main(): 104 parser = argparse.ArgumentParser(description=_DESCRIPTION) 105 parser.add_argument('--port', 106 type=int, 107 default=50051, 108 nargs='?', 109 help='The port on which the server will listen.') 110 parser.add_argument( 111 '--maximum-hashes', 112 type=int, 113 default=1000000, 114 nargs='?', 115 help='The maximum number of hashes to search before cancelling.') 116 args = parser.parse_args() 117 server = _running_server(args.port, args.maximum_hashes) 118 server.wait_for_termination() 119 120 121if __name__ == "__main__": 122 logging.basicConfig() 123 main() 124