• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env ruby
2
3# Copyright 2015 gRPC authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17# This is the xDS interop test Ruby client. This is meant to be run by
18# the run_xds_tests.py test runner.
19#
20# Usage: $ tools/run_tests/run_xds_tests.py --test_case=... ...
21#    --client_cmd="path/to/xds_client.rb --server=<hostname> \
22#                                        --stats_port=<port> \
23#                                        --qps=<qps>"
24
25# These lines are required for the generated files to load grpc
26this_dir = File.expand_path(File.dirname(__FILE__))
27lib_dir = File.join(File.dirname(File.dirname(this_dir)), 'lib')
28pb_dir = File.dirname(this_dir)
29$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)
30$LOAD_PATH.unshift(pb_dir) unless $LOAD_PATH.include?(pb_dir)
31
32require 'optparse'
33require 'logger'
34
35require_relative '../../lib/grpc'
36require 'google/protobuf'
37
38require_relative '../src/proto/grpc/testing/empty_pb'
39require_relative '../src/proto/grpc/testing/messages_pb'
40require_relative '../src/proto/grpc/testing/test_services_pb'
41
42# Some global variables to be shared by server and client
43$watchers = Array.new
44$watchers_mutex = Mutex.new
45$watchers_cv = ConditionVariable.new
46$shutdown = false
47
48# RubyLogger defines a logger for gRPC based on the standard ruby logger.
49module RubyLogger
50  def logger
51    LOGGER
52  end
53
54  LOGGER = Logger.new(STDOUT)
55  LOGGER.level = Logger::INFO
56end
57
58# GRPC is the general RPC module
59module GRPC
60  # Inject the noop #logger if no module-level logger method has been injected.
61  extend RubyLogger
62end
63
64# creates a test stub
65def create_stub(opts)
66  address = "#{opts.server}"
67  GRPC.logger.info("... connecting insecurely to #{address}")
68  Grpc::Testing::TestService::Stub.new(
69    address,
70    :this_channel_is_insecure,
71  )
72end
73
74# This implements LoadBalancerStatsService required by the test runner
75class TestTarget < Grpc::Testing::LoadBalancerStatsService::Service
76  include Grpc::Testing
77
78  def get_client_stats(req, _call)
79    finish_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) +
80                  req['timeout_sec']
81    watcher = {}
82    $watchers_mutex.synchronize do
83      watcher = {
84        "rpcs_by_peer" => Hash.new(0),
85        "rpcs_needed" => req['num_rpcs'],
86        "no_remote_peer" => 0
87      }
88      $watchers << watcher
89      seconds_remaining = finish_time -
90                          Process.clock_gettime(Process::CLOCK_MONOTONIC)
91      while watcher['rpcs_needed'] > 0 && seconds_remaining > 0
92        $watchers_cv.wait($watchers_mutex, seconds_remaining)
93        seconds_remaining = finish_time -
94                            Process.clock_gettime(Process::CLOCK_MONOTONIC)
95      end
96      $watchers.delete_at($watchers.index(watcher))
97    end
98    LoadBalancerStatsResponse.new(
99      rpcs_by_peer: watcher['rpcs_by_peer'],
100      num_failures: watcher['no_remote_peer'] + watcher['rpcs_needed']
101    );
102  end
103end
104
105# send 1 rpc every 1/qps second
106def run_test_loop(stub, target_seconds_between_rpcs, fail_on_failed_rpcs)
107  include Grpc::Testing
108  req = SimpleRequest.new()
109  target_next_start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
110  while !$shutdown
111    now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
112    sleep_seconds = target_next_start - now
113    if sleep_seconds < 0
114      target_next_start = now + target_seconds_between_rpcs
115      GRPC.logger.info(
116        "ruby xds: warning, rpc takes too long to finish. " \
117        "Deficit = %.1fms. " \
118        "If you consistently see this, the qps is too high." \
119        % [(sleep_seconds * 1000).abs().round(1)])
120    else
121      target_next_start += target_seconds_between_rpcs
122      sleep(sleep_seconds)
123    end
124    begin
125      deadline = GRPC::Core::TimeConsts::from_relative_time(30) # 30 seconds
126      resp = stub.unary_call(req, deadline: deadline)
127      remote_peer = resp.hostname
128    rescue GRPC::BadStatus => e
129      remote_peer = ""
130      GRPC.logger.info("ruby xds: rpc failed:|#{e.message}|, " \
131                       "this may or may not be expected")
132      if fail_on_failed_rpcs
133        raise e
134      end
135    end
136    $watchers_mutex.synchronize do
137      $watchers.each do |watcher|
138        watcher['rpcs_needed'] -= 1
139        if remote_peer.strip.empty?
140          watcher['no_remote_peer'] += 1
141        else
142          watcher['rpcs_by_peer'][remote_peer] += 1
143        end
144      end
145      $watchers_cv.broadcast
146    end
147  end
148end
149
150# Args is used to hold the command line info.
151Args = Struct.new(:fail_on_failed_rpcs, :num_channels,
152                  :server, :stats_port, :qps)
153
154# validates the command line options, returning them as a Hash.
155def parse_args
156  args = Args.new
157  args['fail_on_failed_rpcs'] = false
158  args['num_channels'] = 1
159  OptionParser.new do |opts|
160    opts.on('--fail_on_failed_rpcs BOOL', ['false', 'true']) do |v|
161      args['fail_on_failed_rpcs'] = v == 'true'
162    end
163    opts.on('--num_channels CHANNELS', 'number of channels') do |v|
164      args['num_channels'] = v.to_i
165    end
166    opts.on('--server SERVER_HOST', 'server hostname') do |v|
167      GRPC.logger.info("ruby xds: server address is #{v}")
168      args['server'] = v
169    end
170    opts.on('--stats_port STATS_PORT', 'stats port') do |v|
171      GRPC.logger.info("ruby xds: stats port is #{v}")
172      args['stats_port'] = v
173    end
174    opts.on('--qps QPS', 'qps') do |v|
175      GRPC.logger.info("ruby xds: qps is #{v}")
176      args['qps'] = v
177    end
178  end.parse!
179  args
180end
181
182def main
183  opts = parse_args
184
185  # This server hosts the LoadBalancerStatsService
186  host = "0.0.0.0:#{opts['stats_port']}"
187  s = GRPC::RpcServer.new
188  s.add_http2_port(host, :this_port_is_insecure)
189  s.handle(TestTarget)
190  server_thread = Thread.new {
191    # run the server until the main test runner terminates this process
192    s.run_till_terminated_or_interrupted(['TERM'])
193  }
194
195  # The client just sends unary rpcs continuously in a regular interval
196  stub = create_stub(opts)
197  target_seconds_between_rpcs = (1.0 / opts['qps'].to_f)
198  client_threads = Array.new
199  opts['num_channels'].times {
200    client_threads << Thread.new {
201      run_test_loop(stub, target_seconds_between_rpcs,
202                    opts['fail_on_failed_rpcs'])
203    }
204  }
205
206  server_thread.join
207  $shutdown = true
208  client_threads.each { |thd| thd.join }
209end
210
211if __FILE__ == $0
212  main
213end
214