• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2019 The gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import datetime
16import threading
17import grpc
18import grpc.experimental
19import subprocess
20import sys
21import time
22import contextlib
23
24_PORT = 5741
25_MESSAGE_SIZE = 4
26_RESPONSE_COUNT = 32 * 1024
27
28_SERVER_CODE = """
29import datetime
30import threading
31import grpc
32from concurrent import futures
33from src.python.grpcio_tests.tests.stress import unary_stream_benchmark_pb2
34from src.python.grpcio_tests.tests.stress import unary_stream_benchmark_pb2_grpc
35
36class Handler(unary_stream_benchmark_pb2_grpc.UnaryStreamBenchmarkServiceServicer):
37
38  def Benchmark(self, request, context):
39    payload = b'\\x00\\x01' * int(request.message_size / 2)
40    for _ in range(request.response_count):
41      yield unary_stream_benchmark_pb2.BenchmarkResponse(response=payload)
42
43
44server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
45server.add_insecure_port('[::]:%d')
46unary_stream_benchmark_pb2_grpc.add_UnaryStreamBenchmarkServiceServicer_to_server(Handler(), server)
47server.start()
48server.wait_for_termination()
49""" % _PORT
50
51try:
52    from src.python.grpcio_tests.tests.stress import unary_stream_benchmark_pb2
53    from src.python.grpcio_tests.tests.stress import unary_stream_benchmark_pb2_grpc
54
55    _GRPC_CHANNEL_OPTIONS = [
56        ('grpc.max_metadata_size', 16 * 1024 * 1024),
57        ('grpc.max_receive_message_length', 64 * 1024 * 1024),
58        (grpc.experimental.ChannelOptions.SingleThreadedUnaryStream, 1),
59    ]
60
61    @contextlib.contextmanager
62    def _running_server():
63        server_process = subprocess.Popen([sys.executable, '-c', _SERVER_CODE],
64                                          stdout=subprocess.PIPE,
65                                          stderr=subprocess.PIPE)
66        try:
67            yield
68        finally:
69            server_process.terminate()
70            server_process.wait()
71            sys.stdout.write("stdout: {}".format(server_process.stdout.read()))
72            sys.stdout.flush()
73            sys.stdout.write("stderr: {}".format(server_process.stderr.read()))
74            sys.stdout.flush()
75
76    def profile(message_size, response_count):
77        request = unary_stream_benchmark_pb2.BenchmarkRequest(
78            message_size=message_size, response_count=response_count)
79        with grpc.insecure_channel('[::]:{}'.format(_PORT),
80                                   options=_GRPC_CHANNEL_OPTIONS) as channel:
81            stub = unary_stream_benchmark_pb2_grpc.UnaryStreamBenchmarkServiceStub(
82                channel)
83            start = datetime.datetime.now()
84            call = stub.Benchmark(request, wait_for_ready=True)
85            for message in call:
86                pass
87            end = datetime.datetime.now()
88        return end - start
89
90    def main():
91        with _running_server():
92            for i in range(1000):
93                latency = profile(_MESSAGE_SIZE, 1024)
94                sys.stdout.write("{}\n".format(latency.total_seconds()))
95                sys.stdout.flush()
96
97    if __name__ == '__main__':
98        main()
99
100except ImportError:
101    # NOTE(rbellevi): The test runner should not load this module.
102    pass
103