1# Copyright 2019 The gRPC Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""The Python example of utilizing wait-for-ready flag.""" 15 16from __future__ import print_function 17import logging 18from concurrent import futures 19from contextlib import contextmanager 20import socket 21import threading 22 23import grpc 24 25from examples import helloworld_pb2 26from examples import helloworld_pb2_grpc 27 28_LOGGER = logging.getLogger(__name__) 29_LOGGER.setLevel(logging.INFO) 30 31 32@contextmanager 33def get_free_loopback_tcp_port(): 34 if socket.has_ipv6: 35 tcp_socket = socket.socket(socket.AF_INET6) 36 else: 37 tcp_socket = socket.socket(socket.AF_INET) 38 tcp_socket.bind(('', 0)) 39 address_tuple = tcp_socket.getsockname() 40 yield "localhost:%s" % (address_tuple[1]) 41 tcp_socket.close() 42 43 44class Greeter(helloworld_pb2_grpc.GreeterServicer): 45 46 def SayHello(self, request, unused_context): 47 return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name) 48 49 50def create_server(server_address): 51 server = grpc.server(futures.ThreadPoolExecutor()) 52 helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server) 53 bound_port = server.add_insecure_port(server_address) 54 assert bound_port == int(server_address.split(':')[-1]) 55 return server 56 57 58def process(stub, wait_for_ready=None): 59 try: 60 response = stub.SayHello(helloworld_pb2.HelloRequest(name='you'), 61 wait_for_ready=wait_for_ready) 62 message = response.message 63 except grpc.RpcError as rpc_error: 64 assert rpc_error.code() == grpc.StatusCode.UNAVAILABLE 65 assert not wait_for_ready 66 message = rpc_error 67 else: 68 assert wait_for_ready 69 _LOGGER.info("Wait-for-ready %s, client received: %s", 70 "enabled" if wait_for_ready else "disabled", message) 71 72 73def main(): 74 # Pick a random free port 75 with get_free_loopback_tcp_port() as server_address: 76 77 # Register connectivity event to notify main thread 78 transient_failure_event = threading.Event() 79 80 def wait_for_transient_failure(channel_connectivity): 81 if channel_connectivity == grpc.ChannelConnectivity.TRANSIENT_FAILURE: 82 transient_failure_event.set() 83 84 # Create gRPC channel 85 channel = grpc.insecure_channel(server_address) 86 channel.subscribe(wait_for_transient_failure) 87 stub = helloworld_pb2_grpc.GreeterStub(channel) 88 89 # Fire an RPC without wait_for_ready 90 thread_disabled_wait_for_ready = threading.Thread(target=process, 91 args=(stub, False)) 92 thread_disabled_wait_for_ready.start() 93 # Fire an RPC with wait_for_ready 94 thread_enabled_wait_for_ready = threading.Thread(target=process, 95 args=(stub, True)) 96 thread_enabled_wait_for_ready.start() 97 98 # Wait for the channel entering TRANSIENT FAILURE state. 99 transient_failure_event.wait() 100 server = create_server(server_address) 101 server.start() 102 103 # Expected to fail with StatusCode.UNAVAILABLE. 104 thread_disabled_wait_for_ready.join() 105 # Expected to success. 106 thread_enabled_wait_for_ready.join() 107 108 server.stop(None) 109 channel.close() 110 111 112if __name__ == '__main__': 113 logging.basicConfig(level=logging.INFO) 114 main() 115