• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2023 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from concurrent import futures
16from typing import Tuple
17
18import grpc
19
20_REQUEST = b"\x00\x00\x00"
21_RESPONSE = b"\x00\x00\x00"
22
23_SERVICE_NAME = "test"
24_UNARY_UNARY = "UnaryUnary"
25_UNARY_UNARY_FILTERED = "UnaryUnaryFiltered"
26_UNARY_STREAM = "UnaryStream"
27_STREAM_UNARY = "StreamUnary"
28_STREAM_STREAM = "StreamStream"
29STREAM_LENGTH = 5
30TRIGGER_RPC_METADATA = ("control", "trigger_rpc")
31TRIGGER_RPC_TO_NEW_SERVER_METADATA = ("to_new_server", "")
32
33
34def handle_unary_unary(request, servicer_context):
35    if TRIGGER_RPC_METADATA in servicer_context.invocation_metadata():
36        for k, v in servicer_context.invocation_metadata():
37            if "port" in k:
38                unary_unary_call(port=int(v))
39            if "to_new_server" in k:
40                second_server = grpc.server(
41                    futures.ThreadPoolExecutor(max_workers=10)
42                )
43                second_server.add_generic_rpc_handlers((_GenericHandler(),))
44                second_server_port = second_server.add_insecure_port("[::]:0")
45                second_server.start()
46                unary_unary_call(port=second_server_port)
47                second_server.stop(0)
48    return _RESPONSE
49
50
51def handle_unary_stream(request, servicer_context):
52    for _ in range(STREAM_LENGTH):
53        yield _RESPONSE
54
55
56def handle_stream_unary(request_iterator, servicer_context):
57    return _RESPONSE
58
59
60def handle_stream_stream(request_iterator, servicer_context):
61    for request in request_iterator:
62        yield _RESPONSE
63
64
65class _MethodHandler(grpc.RpcMethodHandler):
66    def __init__(self, request_streaming, response_streaming):
67        self.request_streaming = request_streaming
68        self.response_streaming = response_streaming
69        self.request_deserializer = None
70        self.response_serializer = None
71        self.unary_unary = None
72        self.unary_stream = None
73        self.stream_unary = None
74        self.stream_stream = None
75        if self.request_streaming and self.response_streaming:
76            self.stream_stream = handle_stream_stream
77        elif self.request_streaming:
78            self.stream_unary = handle_stream_unary
79        elif self.response_streaming:
80            self.unary_stream = handle_unary_stream
81        else:
82            self.unary_unary = handle_unary_unary
83
84
85class _GenericHandler(grpc.GenericRpcHandler):
86    def service(self, handler_call_details):
87        if handler_call_details.method == _UNARY_UNARY:
88            return _MethodHandler(False, False)
89        if handler_call_details.method == _UNARY_UNARY_FILTERED:
90            return _MethodHandler(False, False)
91        elif handler_call_details.method == _UNARY_STREAM:
92            return _MethodHandler(False, True)
93        elif handler_call_details.method == _STREAM_UNARY:
94            return _MethodHandler(True, False)
95        elif handler_call_details.method == _STREAM_STREAM:
96            return _MethodHandler(True, True)
97        else:
98            return None
99
100
101RPC_METHOD_HANDLERS = {
102    _UNARY_UNARY_FILTERED: _MethodHandler(False, False),
103    _UNARY_UNARY: _MethodHandler(False, False),
104    _UNARY_STREAM: _MethodHandler(False, True),
105    _STREAM_UNARY: _MethodHandler(True, False),
106    _STREAM_STREAM: _MethodHandler(True, True),
107}
108
109REGISTERED_RPC_METHOD_HANDLERS = {
110    _UNARY_UNARY_FILTERED: _MethodHandler(False, False),
111    _UNARY_UNARY: _MethodHandler(False, False),
112    _UNARY_STREAM: _MethodHandler(False, True),
113    _STREAM_UNARY: _MethodHandler(True, False),
114    _STREAM_STREAM: _MethodHandler(True, True),
115}
116
117
118def start_server(
119    interceptors=None, register_method=True
120) -> Tuple[grpc.Server, int]:
121    if interceptors:
122        server = grpc.server(
123            futures.ThreadPoolExecutor(max_workers=10),
124            interceptors=interceptors,
125        )
126    else:
127        server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
128    generic_handler = grpc.method_handlers_generic_handler(
129        _SERVICE_NAME, RPC_METHOD_HANDLERS
130    )
131    server.add_generic_rpc_handlers((generic_handler,))
132    if register_method:
133        server.add_registered_method_handlers(
134            _SERVICE_NAME, REGISTERED_RPC_METHOD_HANDLERS
135        )
136    port = server.add_insecure_port("[::]:0")
137    server.start()
138    return server, port
139
140
141def unary_unary_call(port, metadata=None, registered_method=False):
142    with grpc.insecure_channel(f"localhost:{port}") as channel:
143        multi_callable = channel.unary_unary(
144            grpc._common.fully_qualified_method(_SERVICE_NAME, _UNARY_UNARY),
145            _registered_method=registered_method,
146        )
147        if metadata:
148            unused_response, call = multi_callable.with_call(
149                _REQUEST, metadata=metadata
150            )
151        else:
152            unused_response, call = multi_callable.with_call(_REQUEST)
153
154
155def intercepted_unary_unary_call(port, interceptors, metadata=None):
156    with grpc.insecure_channel(f"localhost:{port}") as channel:
157        intercept_channel = grpc.intercept_channel(channel, interceptors)
158        multi_callable = intercept_channel.unary_unary(
159            grpc._common.fully_qualified_method(_SERVICE_NAME, _UNARY_UNARY)
160        )
161        if metadata:
162            unused_response, call = multi_callable.with_call(
163                _REQUEST, metadata=metadata
164            )
165        else:
166            unused_response, call = multi_callable.with_call(_REQUEST)
167
168
169def unary_unary_filtered_call(port, metadata=None):
170    with grpc.insecure_channel(f"localhost:{port}") as channel:
171        multi_callable = channel.unary_unary(
172            grpc._common.fully_qualified_method(
173                _SERVICE_NAME, _UNARY_UNARY_FILTERED
174            )
175        )
176        if metadata:
177            unused_response, call = multi_callable.with_call(
178                _REQUEST, metadata=metadata
179            )
180        else:
181            unused_response, call = multi_callable.with_call(_REQUEST)
182
183
184def unary_stream_call(port):
185    with grpc.insecure_channel(f"localhost:{port}") as channel:
186        multi_callable = channel.unary_stream(
187            grpc._common.fully_qualified_method(_SERVICE_NAME, _UNARY_STREAM)
188        )
189        call = multi_callable(_REQUEST)
190        for _ in call:
191            pass
192
193
194def stream_unary_call(port):
195    with grpc.insecure_channel(f"localhost:{port}") as channel:
196        multi_callable = channel.stream_unary(
197            grpc._common.fully_qualified_method(_SERVICE_NAME, _STREAM_UNARY)
198        )
199        unused_response, call = multi_callable.with_call(
200            iter([_REQUEST] * STREAM_LENGTH)
201        )
202
203
204def stream_stream_call(port):
205    with grpc.insecure_channel(f"localhost:{port}") as channel:
206        multi_callable = channel.stream_stream(
207            grpc._common.fully_qualified_method(_SERVICE_NAME, _STREAM_STREAM)
208        )
209        call = multi_callable(iter([_REQUEST] * STREAM_LENGTH))
210        for _ in call:
211            pass
212