• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2018 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Defines a number of module-scope gRPC scenarios to test server shutdown."""
15
16import argparse
17from concurrent import futures
18import logging
19import os
20import queue
21import threading
22import time
23
24import grpc
25
26from tests.unit import test_common
27
28WAIT_TIME = 1000
29
30REQUEST = b"request"
31RESPONSE = b"response"
32
33SERVER_RAISES_EXCEPTION = "server_raises_exception"
34SERVER_DEALLOCATED = "server_deallocated"
35SERVER_FORK_CAN_EXIT = "server_fork_can_exit"
36
37_SERVICE_NAME = "test"
38FORK_EXIT = "ForkExit"
39
40
41def fork_and_exit(request, servicer_context):
42    pid = os.fork()
43    if pid == 0:
44        os._exit(0)
45    return RESPONSE
46
47
48class GenericHandler(grpc.GenericRpcHandler):
49    def service(self, handler_call_details):
50        if handler_call_details.method == FORK_EXIT:
51            return grpc.unary_unary_rpc_method_handler(fork_and_exit)
52        else:
53            return None
54
55
56_METHOD_HANDLERS = {
57    FORK_EXIT: grpc.unary_unary_rpc_method_handler(fork_and_exit)
58}
59
60
61def run_server(port_queue):
62    server = test_common.test_server()
63    port = server.add_insecure_port("[::]:0")
64    port_queue.put(port)
65    server.add_registered_method_handlers(_SERVICE_NAME, _METHOD_HANDLERS)
66    server.start()
67    # threading.Event.wait() does not exhibit the bug identified in
68    # https://github.com/grpc/grpc/issues/17093, sleep instead
69    time.sleep(WAIT_TIME)
70
71
72def run_test(args):
73    if args.scenario == SERVER_RAISES_EXCEPTION:
74        server = test_common.test_server()
75        server.start()
76        raise Exception()
77    elif args.scenario == SERVER_DEALLOCATED:
78        server = test_common.test_server()
79        server.start()
80        server.__del__()
81        while server._state.stage != grpc._server._ServerStage.STOPPED:
82            pass
83    elif args.scenario == SERVER_FORK_CAN_EXIT:
84        port_queue = queue.Queue()
85        thread = threading.Thread(target=run_server, args=(port_queue,))
86        thread.daemon = True
87        thread.start()
88        port = port_queue.get()
89        channel = grpc.insecure_channel("localhost:%d" % port)
90        multi_callable = channel.unary_unary(
91            grpc._common.fully_qualified_method(_SERVICE_NAME, FORK_EXIT),
92            _registered_method=True,
93        )
94        result, call = multi_callable.with_call(REQUEST, wait_for_ready=True)
95        os.wait()
96    else:
97        raise ValueError("unknown test scenario")
98
99
100if __name__ == "__main__":
101    logging.basicConfig()
102    parser = argparse.ArgumentParser()
103    parser.add_argument("scenario", type=str)
104    args = parser.parse_args()
105    run_test(args)
106