1#!/usr/bin/env ruby 2 3# Copyright 2015 gRPC authors. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17# This is the xDS interop test Ruby client. This is meant to be run by 18# the run_xds_tests.py test runner. 19# 20# Usage: $ tools/run_tests/run_xds_tests.py --test_case=... ... 21# --client_cmd="path/to/xds_client.rb --server=<hostname> \ 22# --stats_port=<port> \ 23# --qps=<qps>" 24 25# These lines are required for the generated files to load grpc 26this_dir = File.expand_path(File.dirname(__FILE__)) 27lib_dir = File.join(File.dirname(File.dirname(this_dir)), 'lib') 28pb_dir = File.dirname(this_dir) 29$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir) 30$LOAD_PATH.unshift(pb_dir) unless $LOAD_PATH.include?(pb_dir) 31 32require 'optparse' 33require 'logger' 34 35require_relative '../../lib/grpc' 36require 'google/protobuf' 37 38require_relative '../src/proto/grpc/testing/empty_pb' 39require_relative '../src/proto/grpc/testing/messages_pb' 40require_relative '../src/proto/grpc/testing/test_services_pb' 41 42class RpcConfig 43 def init(rpcs_to_send, metadata_to_send) 44 @rpcs_to_send = rpcs_to_send 45 @metadata_to_send = metadata_to_send 46 end 47 def rpcs_to_send 48 @rpcs_to_send 49 end 50 def metadata_to_send 51 @metadata_to_send 52 end 53end 54 55# Some global constant mappings 56$RPC_MAP = { 57 'UnaryCall' => :UNARY_CALL, 58 'EmptyCall' => :EMPTY_CALL, 59} 60 61# Some global variables to be shared by server and client 62$watchers = Array.new 63$watchers_mutex = Mutex.new 64$watchers_cv = ConditionVariable.new 65$shutdown = false 66# These can be configured by the test runner dynamically 67$rpc_config = RpcConfig.new 68$rpc_config.init([:UNARY_CALL], {}) 69# These stats are shared across threads 70$accumulated_stats_mu = Mutex.new 71$num_rpcs_started_by_method = {} 72$num_rpcs_succeeded_by_method = {} 73$num_rpcs_failed_by_method = {} 74 75# RubyLogger defines a logger for gRPC based on the standard ruby logger. 76module RubyLogger 77 def logger 78 LOGGER 79 end 80 81 LOGGER = Logger.new(STDOUT) 82 LOGGER.level = Logger::INFO 83end 84 85# GRPC is the general RPC module 86module GRPC 87 # Inject the noop #logger if no module-level logger method has been injected. 88 extend RubyLogger 89end 90 91# creates a test stub 92def create_stub(opts) 93 address = "#{opts.server}" 94 GRPC.logger.info("... connecting insecurely to #{address}") 95 Grpc::Testing::TestService::Stub.new( 96 address, 97 :this_channel_is_insecure, 98 ) 99end 100 101class ConfigureTarget < Grpc::Testing::XdsUpdateClientConfigureService::Service 102 include Grpc::Testing 103 104 def configure(req, _call) 105 rpcs_to_send = req['types']; 106 metadata_to_send = {} 107 req['metadata'].each do |m| 108 rpc = m.type 109 if !metadata_to_send.key?(rpc) 110 metadata_to_send[rpc] = {} 111 end 112 metadata_key = m.key 113 metadata_value = m.value 114 metadata_to_send[rpc][metadata_key] = metadata_value 115 end 116 GRPC.logger.info("Configuring new rpcs_to_send and metadata_to_send...") 117 GRPC.logger.info(rpcs_to_send) 118 GRPC.logger.info(metadata_to_send) 119 new_rpc_config = RpcConfig.new 120 new_rpc_config.init(rpcs_to_send, metadata_to_send) 121 $rpc_config = new_rpc_config 122 ClientConfigureResponse.new(); 123 end 124end 125 126# This implements LoadBalancerStatsService required by the test runner 127class TestTarget < Grpc::Testing::LoadBalancerStatsService::Service 128 include Grpc::Testing 129 130 def get_client_stats(req, _call) 131 finish_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + 132 req['timeout_sec'] 133 watcher = {} 134 $watchers_mutex.synchronize do 135 watcher = { 136 "rpcs_by_method" => Hash.new(), 137 "rpcs_by_peer" => Hash.new(0), 138 "rpcs_needed" => req['num_rpcs'], 139 "no_remote_peer" => 0 140 } 141 $watchers << watcher 142 seconds_remaining = finish_time - 143 Process.clock_gettime(Process::CLOCK_MONOTONIC) 144 while watcher['rpcs_needed'] > 0 && seconds_remaining > 0 145 $watchers_cv.wait($watchers_mutex, seconds_remaining) 146 seconds_remaining = finish_time - 147 Process.clock_gettime(Process::CLOCK_MONOTONIC) 148 end 149 $watchers.delete_at($watchers.index(watcher)) 150 end 151 # convert results into proper proto object 152 rpcs_by_method = {} 153 watcher['rpcs_by_method'].each do |rpc_name, rpcs_by_peer| 154 rpcs_by_method[rpc_name] = LoadBalancerStatsResponse::RpcsByPeer.new( 155 rpcs_by_peer: rpcs_by_peer 156 ) 157 end 158 LoadBalancerStatsResponse.new( 159 rpcs_by_method: rpcs_by_method, 160 rpcs_by_peer: watcher['rpcs_by_peer'], 161 num_failures: watcher['no_remote_peer'] + watcher['rpcs_needed'] 162 ); 163 end 164 165 def get_client_accumulated_stats(req, _call) 166 $accumulated_stats_mu.synchronize do 167 LoadBalancerAccumulatedStatsResponse.new( 168 num_rpcs_started_by_method: $num_rpcs_started_by_method, 169 num_rpcs_succeeded_by_method: $num_rpcs_succeeded_by_method, 170 num_rpcs_failed_by_method: $num_rpcs_failed_by_method 171 ) 172 end 173 end 174end 175 176# execute 1 RPC and return remote hostname 177def execute_rpc(op, fail_on_failed_rpcs, rpc_stats_key) 178 remote_peer = "" 179 begin 180 op.execute 181 if op.metadata.key?('hostname') 182 remote_peer = op.metadata['hostname'] 183 end 184 rescue GRPC::BadStatus => e 185 if fail_on_failed_rpcs 186 raise e 187 end 188 end 189 $accumulated_stats_mu.synchronize do 190 if remote_peer.empty? 191 $num_rpcs_failed_by_method[rpc_stats_key] += 1 192 else 193 $num_rpcs_succeeded_by_method[rpc_stats_key] += 1 194 end 195 end 196 remote_peer 197end 198 199def execute_rpc_in_thread(op, rpc_stats_key) 200 Thread.new { 201 begin 202 op.execute 203 # The following should _not_ happen with the current spec 204 # because we are only executing RPCs in a thread if we expect it 205 # to be kept open, or deadline_exceeded, or dropped by the load 206 # balancing policy. These RPCs should not complete successfully. 207 # Doing this for consistency 208 $accumulated_stats_mu.synchronize do 209 $num_rpcs_succeeded_by_method[rpc_stats_key] += 1 210 end 211 rescue GRPC::BadStatus => e 212 # Normal execution arrives here, 213 # either because of deadline_exceeded or "call dropped by load 214 # balancing policy" 215 $accumulated_stats_mu.synchronize do 216 $num_rpcs_failed_by_method[rpc_stats_key] += 1 217 end 218 end 219 } 220end 221 222# send 1 rpc every 1/qps second 223def run_test_loop(stub, target_seconds_between_rpcs, fail_on_failed_rpcs) 224 include Grpc::Testing 225 simple_req = SimpleRequest.new() 226 empty_req = Empty.new() 227 target_next_start = Process.clock_gettime(Process::CLOCK_MONOTONIC) 228 # Some RPCs are meant to be "kept open". Since Ruby does not have an 229 # async API, we are executing those RPCs in a thread so that they don't 230 # block. 231 keep_open_threads = Array.new 232 while !$shutdown 233 now = Process.clock_gettime(Process::CLOCK_MONOTONIC) 234 sleep_seconds = target_next_start - now 235 if sleep_seconds < 0 236 target_next_start = now + target_seconds_between_rpcs 237 else 238 target_next_start += target_seconds_between_rpcs 239 sleep(sleep_seconds) 240 end 241 deadline = GRPC::Core::TimeConsts::from_relative_time(30) # 30 seconds 242 results = {} 243 $rpc_config.rpcs_to_send.each do |rpc| 244 # rpc is in the form of :UNARY_CALL or :EMPTY_CALL here 245 metadata = $rpc_config.metadata_to_send.key?(rpc) ? 246 $rpc_config.metadata_to_send[rpc] : {} 247 $accumulated_stats_mu.synchronize do 248 $num_rpcs_started_by_method[rpc.to_s] += 1 249 num_started = $num_rpcs_started_by_method[rpc.to_s] 250 if num_started % 100 == 0 251 GRPC.logger.info("Started #{num_started} of #{rpc}") 252 end 253 end 254 if rpc == :UNARY_CALL 255 op = stub.unary_call(simple_req, 256 metadata: metadata, 257 deadline: deadline, 258 return_op: true) 259 elsif rpc == :EMPTY_CALL 260 op = stub.empty_call(empty_req, 261 metadata: metadata, 262 deadline: deadline, 263 return_op: true) 264 else 265 raise "Unsupported rpc #{rpc}" 266 end 267 rpc_stats_key = rpc.to_s 268 if metadata.key?('rpc-behavior') and 269 (metadata['rpc-behavior'] == 'keep-open') 270 num_open_threads = keep_open_threads.size 271 if num_open_threads % 50 == 0 272 GRPC.logger.info("number of keep_open_threads = #{num_open_threads}") 273 end 274 keep_open_threads << execute_rpc_in_thread(op, rpc_stats_key) 275 else 276 results[rpc] = execute_rpc(op, fail_on_failed_rpcs, rpc_stats_key) 277 end 278 end 279 $watchers_mutex.synchronize do 280 $watchers.each do |watcher| 281 # this is counted once when each group of all rpcs_to_send were done 282 watcher['rpcs_needed'] -= 1 283 results.each do |rpc_name, remote_peer| 284 # These stats expect rpc_name to be in the form of 285 # UnaryCall or EmptyCall, not the underscore-case all-caps form 286 rpc_name = $RPC_MAP.invert()[rpc_name] 287 if remote_peer.strip.empty? 288 # error is counted per individual RPC 289 watcher['no_remote_peer'] += 1 290 else 291 if not watcher['rpcs_by_method'].key?(rpc_name) 292 watcher['rpcs_by_method'][rpc_name] = Hash.new(0) 293 end 294 # increment the remote hostname distribution histogram 295 # both by overall, and broken down per RPC 296 watcher['rpcs_by_method'][rpc_name][remote_peer] += 1 297 watcher['rpcs_by_peer'][remote_peer] += 1 298 end 299 end 300 end 301 $watchers_cv.broadcast 302 end 303 end 304 keep_open_threads.each { |thd| thd.join } 305end 306 307# Args is used to hold the command line info. 308Args = Struct.new(:fail_on_failed_rpcs, :num_channels, 309 :rpc, :metadata, 310 :server, :stats_port, :qps) 311 312# validates the command line options, returning them as a Hash. 313def parse_args 314 args = Args.new 315 args['fail_on_failed_rpcs'] = false 316 args['num_channels'] = 1 317 args['rpc'] = 'UnaryCall' 318 args['metadata'] = '' 319 OptionParser.new do |opts| 320 opts.on('--fail_on_failed_rpcs BOOL', ['false', 'true']) do |v| 321 args['fail_on_failed_rpcs'] = v == 'true' 322 end 323 opts.on('--num_channels CHANNELS', 'number of channels') do |v| 324 args['num_channels'] = v.to_i 325 end 326 opts.on('--rpc RPCS_TO_SEND', 'list of RPCs to send') do |v| 327 args['rpc'] = v 328 end 329 opts.on('--metadata METADATA_TO_SEND', 'metadata to send per RPC') do |v| 330 args['metadata'] = v 331 end 332 opts.on('--server SERVER_HOST', 'server hostname') do |v| 333 GRPC.logger.info("ruby xds: server address is #{v}") 334 args['server'] = v 335 end 336 opts.on('--stats_port STATS_PORT', 'stats port') do |v| 337 GRPC.logger.info("ruby xds: stats port is #{v}") 338 args['stats_port'] = v 339 end 340 opts.on('--qps QPS', 'qps') do |v| 341 GRPC.logger.info("ruby xds: qps is #{v}") 342 args['qps'] = v 343 end 344 end.parse! 345 args 346end 347 348def main 349 opts = parse_args 350 351 # This server hosts the LoadBalancerStatsService 352 host = "0.0.0.0:#{opts['stats_port']}" 353 s = GRPC::RpcServer.new 354 s.add_http2_port(host, :this_port_is_insecure) 355 s.handle(TestTarget) 356 s.handle(ConfigureTarget) 357 server_thread = Thread.new { 358 # run the server until the main test runner terminates this process 359 s.run_till_terminated_or_interrupted(['TERM']) 360 } 361 362 # Initialize stats 363 $RPC_MAP.values.each do |rpc| 364 $num_rpcs_started_by_method[rpc.to_s] = 0 365 $num_rpcs_succeeded_by_method[rpc.to_s] = 0 366 $num_rpcs_failed_by_method[rpc.to_s] = 0 367 end 368 369 # The client just sends rpcs continuously in a regular interval 370 stub = create_stub(opts) 371 target_seconds_between_rpcs = (1.0 / opts['qps'].to_f) 372 # Convert 'metadata' input in the form of 373 # rpc1:k1:v1,rpc2:k2:v2,rpc1:k3:v3 374 # into 375 # { 376 # 'rpc1' => { 377 # 'k1' => 'v1', 378 # 'k3' => 'v3', 379 # }, 380 # 'rpc2' => { 381 # 'k2' => 'v2' 382 # }, 383 # } 384 rpcs_to_send = [] 385 metadata_to_send = {} 386 if opts['metadata'] 387 metadata_entries = opts['metadata'].split(',') 388 metadata_entries.each do |e| 389 (rpc_name, metadata_key, metadata_value) = e.split(':') 390 rpc_name = $RPC_MAP[rpc_name] 391 # initialize if we haven't seen this rpc_name yet 392 if !metadata_to_send.key?(rpc_name) 393 metadata_to_send[rpc_name] = {} 394 end 395 metadata_to_send[rpc_name][metadata_key] = metadata_value 396 end 397 end 398 if opts['rpc'] 399 rpcs_to_send = opts['rpc'].split(',') 400 end 401 if rpcs_to_send.size > 0 402 rpcs_to_send.map! { |rpc| $RPC_MAP[rpc] } 403 new_rpc_config = RpcConfig.new 404 new_rpc_config.init(rpcs_to_send, metadata_to_send) 405 $rpc_config = new_rpc_config 406 end 407 client_threads = Array.new 408 opts['num_channels'].times { 409 client_threads << Thread.new { 410 run_test_loop(stub, target_seconds_between_rpcs, 411 opts['fail_on_failed_rpcs']) 412 } 413 } 414 415 server_thread.join 416 $shutdown = true 417 client_threads.each { |thd| thd.join } 418end 419 420if __FILE__ == $0 421 main 422end 423