1#!/usr/bin/env ruby 2 3# Copyright 2015 gRPC authors. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17# This is the xDS interop test Ruby client. This is meant to be run by 18# the run_xds_tests.py test runner. 19# 20# Usage: $ tools/run_tests/run_xds_tests.py --test_case=... ... 21# --client_cmd="path/to/xds_client.rb --server=<hostname> \ 22# --stats_port=<port> \ 23# --qps=<qps>" 24 25# These lines are required for the generated files to load grpc 26this_dir = File.expand_path(File.dirname(__FILE__)) 27lib_dir = File.join(File.dirname(File.dirname(this_dir)), 'lib') 28pb_dir = File.dirname(this_dir) 29$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir) 30$LOAD_PATH.unshift(pb_dir) unless $LOAD_PATH.include?(pb_dir) 31 32require 'optparse' 33require 'logger' 34 35require_relative '../../lib/grpc' 36require 'google/protobuf' 37 38require_relative '../src/proto/grpc/testing/empty_pb' 39require_relative '../src/proto/grpc/testing/messages_pb' 40require_relative '../src/proto/grpc/testing/test_services_pb' 41 42# Some global variables to be shared by server and client 43$watchers = Array.new 44$watchers_mutex = Mutex.new 45$watchers_cv = ConditionVariable.new 46$shutdown = false 47 48# RubyLogger defines a logger for gRPC based on the standard ruby logger. 49module RubyLogger 50 def logger 51 LOGGER 52 end 53 54 LOGGER = Logger.new(STDOUT) 55 LOGGER.level = Logger::INFO 56end 57 58# GRPC is the general RPC module 59module GRPC 60 # Inject the noop #logger if no module-level logger method has been injected. 61 extend RubyLogger 62end 63 64# creates a test stub 65def create_stub(opts) 66 address = "#{opts.server}" 67 GRPC.logger.info("... connecting insecurely to #{address}") 68 Grpc::Testing::TestService::Stub.new( 69 address, 70 :this_channel_is_insecure, 71 ) 72end 73 74# This implements LoadBalancerStatsService required by the test runner 75class TestTarget < Grpc::Testing::LoadBalancerStatsService::Service 76 include Grpc::Testing 77 78 def get_client_stats(req, _call) 79 finish_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + 80 req['timeout_sec'] 81 watcher = {} 82 $watchers_mutex.synchronize do 83 watcher = { 84 "rpcs_by_peer" => Hash.new(0), 85 "rpcs_needed" => req['num_rpcs'], 86 "no_remote_peer" => 0 87 } 88 $watchers << watcher 89 seconds_remaining = finish_time - 90 Process.clock_gettime(Process::CLOCK_MONOTONIC) 91 while watcher['rpcs_needed'] > 0 && seconds_remaining > 0 92 $watchers_cv.wait($watchers_mutex, seconds_remaining) 93 seconds_remaining = finish_time - 94 Process.clock_gettime(Process::CLOCK_MONOTONIC) 95 end 96 $watchers.delete_at($watchers.index(watcher)) 97 end 98 LoadBalancerStatsResponse.new( 99 rpcs_by_peer: watcher['rpcs_by_peer'], 100 num_failures: watcher['no_remote_peer'] + watcher['rpcs_needed'] 101 ); 102 end 103end 104 105# send 1 rpc every 1/qps second 106def run_test_loop(stub, target_seconds_between_rpcs, fail_on_failed_rpcs) 107 include Grpc::Testing 108 req = SimpleRequest.new() 109 target_next_start = Process.clock_gettime(Process::CLOCK_MONOTONIC) 110 while !$shutdown 111 now = Process.clock_gettime(Process::CLOCK_MONOTONIC) 112 sleep_seconds = target_next_start - now 113 if sleep_seconds < 0 114 target_next_start = now + target_seconds_between_rpcs 115 GRPC.logger.info( 116 "ruby xds: warning, rpc takes too long to finish. " \ 117 "Deficit = %.1fms. " \ 118 "If you consistently see this, the qps is too high." \ 119 % [(sleep_seconds * 1000).abs().round(1)]) 120 else 121 target_next_start += target_seconds_between_rpcs 122 sleep(sleep_seconds) 123 end 124 begin 125 deadline = GRPC::Core::TimeConsts::from_relative_time(30) # 30 seconds 126 resp = stub.unary_call(req, deadline: deadline) 127 remote_peer = resp.hostname 128 rescue GRPC::BadStatus => e 129 remote_peer = "" 130 GRPC.logger.info("ruby xds: rpc failed:|#{e.message}|, " \ 131 "this may or may not be expected") 132 if fail_on_failed_rpcs 133 raise e 134 end 135 end 136 $watchers_mutex.synchronize do 137 $watchers.each do |watcher| 138 watcher['rpcs_needed'] -= 1 139 if remote_peer.strip.empty? 140 watcher['no_remote_peer'] += 1 141 else 142 watcher['rpcs_by_peer'][remote_peer] += 1 143 end 144 end 145 $watchers_cv.broadcast 146 end 147 end 148end 149 150# Args is used to hold the command line info. 151Args = Struct.new(:fail_on_failed_rpcs, :num_channels, 152 :server, :stats_port, :qps) 153 154# validates the command line options, returning them as a Hash. 155def parse_args 156 args = Args.new 157 args['fail_on_failed_rpcs'] = false 158 args['num_channels'] = 1 159 OptionParser.new do |opts| 160 opts.on('--fail_on_failed_rpcs BOOL', ['false', 'true']) do |v| 161 args['fail_on_failed_rpcs'] = v == 'true' 162 end 163 opts.on('--num_channels CHANNELS', 'number of channels') do |v| 164 args['num_channels'] = v.to_i 165 end 166 opts.on('--server SERVER_HOST', 'server hostname') do |v| 167 GRPC.logger.info("ruby xds: server address is #{v}") 168 args['server'] = v 169 end 170 opts.on('--stats_port STATS_PORT', 'stats port') do |v| 171 GRPC.logger.info("ruby xds: stats port is #{v}") 172 args['stats_port'] = v 173 end 174 opts.on('--qps QPS', 'qps') do |v| 175 GRPC.logger.info("ruby xds: qps is #{v}") 176 args['qps'] = v 177 end 178 end.parse! 179 args 180end 181 182def main 183 opts = parse_args 184 185 # This server hosts the LoadBalancerStatsService 186 host = "0.0.0.0:#{opts['stats_port']}" 187 s = GRPC::RpcServer.new 188 s.add_http2_port(host, :this_port_is_insecure) 189 s.handle(TestTarget) 190 server_thread = Thread.new { 191 # run the server until the main test runner terminates this process 192 s.run_till_terminated_or_interrupted(['TERM']) 193 } 194 195 # The client just sends unary rpcs continuously in a regular interval 196 stub = create_stub(opts) 197 target_seconds_between_rpcs = (1.0 / opts['qps'].to_f) 198 client_threads = Array.new 199 opts['num_channels'].times { 200 client_threads << Thread.new { 201 run_test_loop(stub, target_seconds_between_rpcs, 202 opts['fail_on_failed_rpcs']) 203 } 204 } 205 206 server_thread.join 207 $shutdown = true 208 client_threads.each { |thd| thd.join } 209end 210 211if __FILE__ == $0 212 main 213end 214