1#!/usr/bin/env ruby 2 3# Copyright 2016 gRPC authors. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17require 'optparse' 18require 'thread' 19require_relative '../pb/test/client' 20require_relative './metrics_server' 21require_relative '../lib/grpc' 22 23class QpsGauge < Gauge 24 @query_count 25 @query_mutex 26 @start_time 27 28 def initialize 29 @query_count = 0 30 @query_mutex = Mutex.new 31 @start_time = Time.now 32 end 33 34 def increment_queries 35 @query_mutex.synchronize { @query_count += 1} 36 end 37 38 def get_name 39 'qps' 40 end 41 42 def get_type 43 'long' 44 end 45 46 def get_value 47 (@query_mutex.synchronize { @query_count / (Time.now - @start_time) }).to_i 48 end 49end 50 51def start_metrics_server(port) 52 host = "0.0.0.0:#{port}" 53 server = GRPC::RpcServer.new 54 server.add_http2_port(host, :this_port_is_insecure) 55 service = MetricsServiceImpl.new 56 server.handle(service) 57 server_thread = Thread.new { server.run_till_terminated } 58 [server, service, server_thread] 59end 60 61StressArgs = Struct.new(:server_addresses, :test_cases, :duration, 62 :channels_per_server, :concurrent_calls, :metrics_port) 63 64def start(stress_args) 65 running = true 66 threads = [] 67 qps_gauge = QpsGauge.new 68 metrics_server, metrics_service, metrics_thread = 69 start_metrics_server(stress_args.metrics_port) 70 metrics_service.register_gauge(qps_gauge) 71 stress_args.server_addresses.each do |address| 72 stress_args.channels_per_server.times do 73 client_args = Args.new 74 client_args.host, client_args.port = address.split(':') 75 client_args.secure = false 76 client_args.test_case = '' 77 stub = create_stub(client_args) 78 named_tests = NamedTests.new(stub, client_args) 79 stress_args.concurrent_calls.times do 80 threads << Thread.new do 81 while running 82 named_tests.method(stress_args.test_cases.sample).call 83 qps_gauge.increment_queries 84 end 85 end 86 end 87 end 88 end 89 if stress_args.duration >= 0 90 sleep stress_args.duration 91 running = false 92 metrics_server.stop 93 p "QPS: #{qps_gauge.get_value}" 94 threads.each { |thd| thd.join; } 95 end 96 metrics_thread.join 97end 98 99def parse_stress_args 100 stress_args = StressArgs.new 101 stress_args.server_addresses = ['localhost:8080'] 102 stress_args.test_cases = [] 103 stress_args.duration = -1 104 stress_args.channels_per_server = 1 105 stress_args.concurrent_calls = 1 106 stress_args.metrics_port = '8081' 107 OptionParser.new do |opts| 108 opts.on('--server_addresses [LIST]', Array) do |addrs| 109 stress_args.server_addresses = addrs 110 end 111 opts.on('--test_cases cases', Array) do |cases| 112 stress_args.test_cases = (cases.map do |item| 113 split = item.split(':') 114 [split[0]] * split[1].to_i 115 end).reduce([], :+) 116 end 117 opts.on('--test_duration_secs [INT]', OptionParser::DecimalInteger) do |time| 118 stress_args.duration = time 119 end 120 opts.on('--num_channels_per_server [INT]', OptionParser::DecimalInteger) do |channels| 121 stress_args.channels_per_server = channels 122 end 123 opts.on('--num_stubs_per_channel [INT]', OptionParser::DecimalInteger) do |stubs| 124 stress_args.concurrent_calls = stubs 125 end 126 opts.on('--metrics_port [port]') do |port| 127 stress_args.metrics_port = port 128 end 129 end.parse! 130 stress_args 131end 132 133def main 134 opts = parse_stress_args 135 start(opts) 136end 137 138if __FILE__ == $0 139 main 140end 141