1# Copyright 2019 The gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import datetime 16import threading 17import grpc 18import grpc.experimental 19import subprocess 20import sys 21import time 22import contextlib 23 24_PORT = 5741 25_MESSAGE_SIZE = 4 26_RESPONSE_COUNT = 32 * 1024 27 28_SERVER_CODE = """ 29import datetime 30import threading 31import grpc 32from concurrent import futures 33from src.python.grpcio_tests.tests.stress import unary_stream_benchmark_pb2 34from src.python.grpcio_tests.tests.stress import unary_stream_benchmark_pb2_grpc 35 36class Handler(unary_stream_benchmark_pb2_grpc.UnaryStreamBenchmarkServiceServicer): 37 38 def Benchmark(self, request, context): 39 payload = b'\\x00\\x01' * int(request.message_size / 2) 40 for _ in range(request.response_count): 41 yield unary_stream_benchmark_pb2.BenchmarkResponse(response=payload) 42 43 44server = grpc.server(futures.ThreadPoolExecutor(max_workers=1)) 45server.add_insecure_port('[::]:%d') 46unary_stream_benchmark_pb2_grpc.add_UnaryStreamBenchmarkServiceServicer_to_server(Handler(), server) 47server.start() 48server.wait_for_termination() 49""" % _PORT 50 51try: 52 from src.python.grpcio_tests.tests.stress import unary_stream_benchmark_pb2 53 from src.python.grpcio_tests.tests.stress import unary_stream_benchmark_pb2_grpc 54 55 _GRPC_CHANNEL_OPTIONS = [ 56 ('grpc.max_metadata_size', 16 * 1024 * 1024), 57 ('grpc.max_receive_message_length', 64 * 1024 * 1024), 58 (grpc.experimental.ChannelOptions.SingleThreadedUnaryStream, 1), 59 ] 60 61 @contextlib.contextmanager 62 def _running_server(): 63 server_process = subprocess.Popen([sys.executable, '-c', _SERVER_CODE], 64 stdout=subprocess.PIPE, 65 stderr=subprocess.PIPE) 66 try: 67 yield 68 finally: 69 server_process.terminate() 70 server_process.wait() 71 sys.stdout.write("stdout: {}".format(server_process.stdout.read())) 72 sys.stdout.flush() 73 sys.stdout.write("stderr: {}".format(server_process.stderr.read())) 74 sys.stdout.flush() 75 76 def profile(message_size, response_count): 77 request = unary_stream_benchmark_pb2.BenchmarkRequest( 78 message_size=message_size, response_count=response_count) 79 with grpc.insecure_channel('[::]:{}'.format(_PORT), 80 options=_GRPC_CHANNEL_OPTIONS) as channel: 81 stub = unary_stream_benchmark_pb2_grpc.UnaryStreamBenchmarkServiceStub( 82 channel) 83 start = datetime.datetime.now() 84 call = stub.Benchmark(request, wait_for_ready=True) 85 for message in call: 86 pass 87 end = datetime.datetime.now() 88 return end - start 89 90 def main(): 91 with _running_server(): 92 for i in range(1000): 93 latency = profile(_MESSAGE_SIZE, 1024) 94 sys.stdout.write("{}\n".format(latency.total_seconds())) 95 sys.stdout.flush() 96 97 if __name__ == '__main__': 98 main() 99 100except ImportError: 101 # NOTE(rbellevi): The test runner should not load this module. 102 pass 103