• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2018 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Defines a number of module-scope gRPC scenarios to test server shutdown."""
15
16import argparse
17import os
18import threading
19import time
20import logging
21
22import grpc
23from tests.unit import test_common
24
25from concurrent import futures
26from six.moves import queue
27
28WAIT_TIME = 1000
29
30REQUEST = b'request'
31RESPONSE = b'response'
32
33SERVER_RAISES_EXCEPTION = 'server_raises_exception'
34SERVER_DEALLOCATED = 'server_deallocated'
35SERVER_FORK_CAN_EXIT = 'server_fork_can_exit'
36
37FORK_EXIT = '/test/ForkExit'
38
39
40def fork_and_exit(request, servicer_context):
41    pid = os.fork()
42    if pid == 0:
43        os._exit(0)
44    return RESPONSE
45
46
47class GenericHandler(grpc.GenericRpcHandler):
48
49    def service(self, handler_call_details):
50        if handler_call_details.method == FORK_EXIT:
51            return grpc.unary_unary_rpc_method_handler(fork_and_exit)
52        else:
53            return None
54
55
56def run_server(port_queue):
57    server = test_common.test_server()
58    port = server.add_insecure_port('[::]:0')
59    port_queue.put(port)
60    server.add_generic_rpc_handlers((GenericHandler(),))
61    server.start()
62    # threading.Event.wait() does not exhibit the bug identified in
63    # https://github.com/grpc/grpc/issues/17093, sleep instead
64    time.sleep(WAIT_TIME)
65
66
67def run_test(args):
68    if args.scenario == SERVER_RAISES_EXCEPTION:
69        server = test_common.test_server()
70        server.start()
71        raise Exception()
72    elif args.scenario == SERVER_DEALLOCATED:
73        server = test_common.test_server()
74        server.start()
75        server.__del__()
76        while server._state.stage != grpc._server._ServerStage.STOPPED:
77            pass
78    elif args.scenario == SERVER_FORK_CAN_EXIT:
79        port_queue = queue.Queue()
80        thread = threading.Thread(target=run_server, args=(port_queue,))
81        thread.daemon = True
82        thread.start()
83        port = port_queue.get()
84        channel = grpc.insecure_channel('localhost:%d' % port)
85        multi_callable = channel.unary_unary(FORK_EXIT)
86        result, call = multi_callable.with_call(REQUEST, wait_for_ready=True)
87        os.wait()
88    else:
89        raise ValueError('unknown test scenario')
90
91
92if __name__ == '__main__':
93    logging.basicConfig()
94    parser = argparse.ArgumentParser()
95    parser.add_argument('scenario', type=str)
96    args = parser.parse_args()
97    run_test(args)
98