• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env ruby
2
3# Copyright 2016 gRPC authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17require 'optparse'
18require 'thread'
19require_relative '../pb/test/client'
20require_relative './metrics_server'
21require_relative '../lib/grpc'
22
23class QpsGauge < Gauge
24  @query_count
25  @query_mutex
26  @start_time
27
28  def initialize
29    @query_count = 0
30    @query_mutex = Mutex.new
31    @start_time = Time.now
32  end
33
34  def increment_queries
35    @query_mutex.synchronize { @query_count += 1}
36  end
37
38  def get_name
39    'qps'
40  end
41
42  def get_type
43    'long'
44  end
45
46  def get_value
47    (@query_mutex.synchronize { @query_count / (Time.now - @start_time) }).to_i
48  end
49end
50
51def start_metrics_server(port)
52  host = "0.0.0.0:#{port}"
53  server = GRPC::RpcServer.new
54  server.add_http2_port(host, :this_port_is_insecure)
55  service = MetricsServiceImpl.new
56  server.handle(service)
57  server_thread = Thread.new { server.run_till_terminated }
58  [server, service, server_thread]
59end
60
61StressArgs = Struct.new(:server_addresses, :test_cases, :duration,
62                        :channels_per_server, :concurrent_calls, :metrics_port)
63
64def start(stress_args)
65  running = true
66  threads = []
67  qps_gauge = QpsGauge.new
68  metrics_server, metrics_service, metrics_thread =
69    start_metrics_server(stress_args.metrics_port)
70  metrics_service.register_gauge(qps_gauge)
71  stress_args.server_addresses.each do |address|
72    stress_args.channels_per_server.times do
73      client_args = Args.new
74      client_args.host, client_args.port = address.split(':')
75      client_args.secure = false
76      client_args.test_case = ''
77      stub = create_stub(client_args)
78      named_tests = NamedTests.new(stub, client_args)
79      stress_args.concurrent_calls.times do
80        threads << Thread.new do
81          while running
82            named_tests.method(stress_args.test_cases.sample).call
83            qps_gauge.increment_queries
84          end
85        end
86      end
87    end
88  end
89  if stress_args.duration >= 0
90    sleep stress_args.duration
91    running = false
92    metrics_server.stop
93    p "QPS: #{qps_gauge.get_value}"
94    threads.each { |thd| thd.join; }
95  end
96  metrics_thread.join
97end
98
99def parse_stress_args
100  stress_args = StressArgs.new
101  stress_args.server_addresses = ['localhost:8080']
102  stress_args.test_cases = []
103  stress_args.duration = -1
104  stress_args.channels_per_server = 1
105  stress_args.concurrent_calls = 1
106  stress_args.metrics_port = '8081'
107  OptionParser.new do |opts|
108    opts.on('--server_addresses [LIST]', Array) do |addrs|
109      stress_args.server_addresses = addrs
110    end
111    opts.on('--test_cases cases', Array) do |cases|
112      stress_args.test_cases = (cases.map do |item|
113                                  split = item.split(':')
114                                  [split[0]] * split[1].to_i
115                                end).reduce([], :+)
116    end
117    opts.on('--test_duration_secs [INT]', OptionParser::DecimalInteger) do |time|
118      stress_args.duration = time
119    end
120    opts.on('--num_channels_per_server [INT]', OptionParser::DecimalInteger) do |channels|
121      stress_args.channels_per_server = channels
122    end
123    opts.on('--num_stubs_per_channel [INT]', OptionParser::DecimalInteger) do |stubs|
124      stress_args.concurrent_calls = stubs
125    end
126    opts.on('--metrics_port [port]') do |port|
127      stress_args.metrics_port = port
128    end
129  end.parse!
130  stress_args
131end
132
133def main
134  opts = parse_stress_args
135  start(opts)
136end
137
138if __FILE__ == $0
139  main
140end
141