• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env ruby
2
3# Copyright 2015 gRPC authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17# This is the xDS interop test Ruby client. This is meant to be run by
18# the run_xds_tests.py test runner.
19#
20# Usage: $ tools/run_tests/run_xds_tests.py --test_case=... ...
21#    --client_cmd="path/to/xds_client.rb --server=<hostname> \
22#                                        --stats_port=<port> \
23#                                        --qps=<qps>"
24
25# These lines are required for the generated files to load grpc
26this_dir = File.expand_path(File.dirname(__FILE__))
27lib_dir = File.join(File.dirname(File.dirname(this_dir)), 'lib')
28pb_dir = File.dirname(this_dir)
29$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)
30$LOAD_PATH.unshift(pb_dir) unless $LOAD_PATH.include?(pb_dir)
31
32require 'optparse'
33require 'logger'
34
35require_relative '../../lib/grpc'
36require 'google/protobuf'
37
38require_relative '../src/proto/grpc/testing/empty_pb'
39require_relative '../src/proto/grpc/testing/messages_pb'
40require_relative '../src/proto/grpc/testing/test_services_pb'
41
42class RpcConfig
43  def init(rpcs_to_send, metadata_to_send)
44    @rpcs_to_send = rpcs_to_send
45    @metadata_to_send = metadata_to_send
46  end
47  def rpcs_to_send
48    @rpcs_to_send
49  end
50  def metadata_to_send
51    @metadata_to_send
52  end
53end
54
55# Some global constant mappings
56$RPC_MAP = {
57  'UnaryCall' => :UNARY_CALL,
58  'EmptyCall' => :EMPTY_CALL,
59}
60
61# Some global variables to be shared by server and client
62$watchers = Array.new
63$watchers_mutex = Mutex.new
64$watchers_cv = ConditionVariable.new
65$shutdown = false
66# These can be configured by the test runner dynamically
67$rpc_config = RpcConfig.new
68$rpc_config.init([:UNARY_CALL], {})
69# These stats are shared across threads
70$accumulated_stats_mu = Mutex.new
71$num_rpcs_started_by_method = {}
72$num_rpcs_succeeded_by_method = {}
73$num_rpcs_failed_by_method = {}
74
75# RubyLogger defines a logger for gRPC based on the standard ruby logger.
76module RubyLogger
77  def logger
78    LOGGER
79  end
80
81  LOGGER = Logger.new(STDOUT)
82  LOGGER.level = Logger::INFO
83end
84
85# GRPC is the general RPC module
86module GRPC
87  # Inject the noop #logger if no module-level logger method has been injected.
88  extend RubyLogger
89end
90
91# creates a test stub
92def create_stub(opts)
93  address = "#{opts.server}"
94  GRPC.logger.info("... connecting insecurely to #{address}")
95  Grpc::Testing::TestService::Stub.new(
96    address,
97    :this_channel_is_insecure,
98  )
99end
100
101class ConfigureTarget < Grpc::Testing::XdsUpdateClientConfigureService::Service
102  include Grpc::Testing
103
104  def configure(req, _call)
105    rpcs_to_send = req['types'];
106    metadata_to_send = {}
107    req['metadata'].each do |m|
108      rpc = m.type
109      if !metadata_to_send.key?(rpc)
110        metadata_to_send[rpc] = {}
111      end
112      metadata_key = m.key
113      metadata_value = m.value
114      metadata_to_send[rpc][metadata_key] = metadata_value
115    end
116    GRPC.logger.info("Configuring new rpcs_to_send and metadata_to_send...")
117    GRPC.logger.info(rpcs_to_send)
118    GRPC.logger.info(metadata_to_send)
119    new_rpc_config = RpcConfig.new
120    new_rpc_config.init(rpcs_to_send, metadata_to_send)
121    $rpc_config = new_rpc_config
122    ClientConfigureResponse.new();
123  end
124end
125
126# This implements LoadBalancerStatsService required by the test runner
127class TestTarget < Grpc::Testing::LoadBalancerStatsService::Service
128  include Grpc::Testing
129
130  def get_client_stats(req, _call)
131    finish_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) +
132                  req['timeout_sec']
133    watcher = {}
134    $watchers_mutex.synchronize do
135      watcher = {
136        "rpcs_by_method" => Hash.new(),
137        "rpcs_by_peer" => Hash.new(0),
138        "rpcs_needed" => req['num_rpcs'],
139        "no_remote_peer" => 0
140      }
141      $watchers << watcher
142      seconds_remaining = finish_time -
143                          Process.clock_gettime(Process::CLOCK_MONOTONIC)
144      while watcher['rpcs_needed'] > 0 && seconds_remaining > 0
145        $watchers_cv.wait($watchers_mutex, seconds_remaining)
146        seconds_remaining = finish_time -
147                            Process.clock_gettime(Process::CLOCK_MONOTONIC)
148      end
149      $watchers.delete_at($watchers.index(watcher))
150    end
151    # convert results into proper proto object
152    rpcs_by_method = {}
153    watcher['rpcs_by_method'].each do |rpc_name, rpcs_by_peer|
154      rpcs_by_method[rpc_name] = LoadBalancerStatsResponse::RpcsByPeer.new(
155        rpcs_by_peer: rpcs_by_peer
156      )
157    end
158    LoadBalancerStatsResponse.new(
159      rpcs_by_method: rpcs_by_method,
160      rpcs_by_peer: watcher['rpcs_by_peer'],
161      num_failures: watcher['no_remote_peer'] + watcher['rpcs_needed']
162    );
163  end
164
165  def get_client_accumulated_stats(req, _call)
166    $accumulated_stats_mu.synchronize do
167      LoadBalancerAccumulatedStatsResponse.new(
168        num_rpcs_started_by_method: $num_rpcs_started_by_method,
169        num_rpcs_succeeded_by_method: $num_rpcs_succeeded_by_method,
170        num_rpcs_failed_by_method: $num_rpcs_failed_by_method
171      )
172    end
173  end
174end
175
176# execute 1 RPC and return remote hostname
177def execute_rpc(op, fail_on_failed_rpcs, rpc_stats_key)
178  remote_peer = ""
179  begin
180    op.execute
181    if op.metadata.key?('hostname')
182      remote_peer = op.metadata['hostname']
183    end
184  rescue GRPC::BadStatus => e
185    if fail_on_failed_rpcs
186      raise e
187    end
188  end
189  $accumulated_stats_mu.synchronize do
190    if remote_peer.empty?
191      $num_rpcs_failed_by_method[rpc_stats_key] += 1
192    else
193      $num_rpcs_succeeded_by_method[rpc_stats_key] += 1
194    end
195  end
196  remote_peer
197end
198
199def execute_rpc_in_thread(op, rpc_stats_key)
200  Thread.new {
201    begin
202      op.execute
203      # The following should _not_ happen with the current spec
204      # because we are only executing RPCs in a thread if we expect it
205      # to be kept open, or deadline_exceeded, or dropped by the load
206      # balancing policy. These RPCs should not complete successfully.
207      # Doing this for consistency
208      $accumulated_stats_mu.synchronize do
209        $num_rpcs_succeeded_by_method[rpc_stats_key] += 1
210      end
211    rescue GRPC::BadStatus => e
212      # Normal execution arrives here,
213      # either because of deadline_exceeded or "call dropped by load
214      # balancing policy"
215      $accumulated_stats_mu.synchronize do
216        $num_rpcs_failed_by_method[rpc_stats_key] += 1
217      end
218    end
219  }
220end
221
222# send 1 rpc every 1/qps second
223def run_test_loop(stub, target_seconds_between_rpcs, fail_on_failed_rpcs)
224  include Grpc::Testing
225  simple_req = SimpleRequest.new()
226  empty_req = Empty.new()
227  target_next_start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
228  # Some RPCs are meant to be "kept open". Since Ruby does not have an
229  # async API, we are executing those RPCs in a thread so that they don't
230  # block.
231  keep_open_threads = Array.new
232  while !$shutdown
233    now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
234    sleep_seconds = target_next_start - now
235    if sleep_seconds < 0
236      target_next_start = now + target_seconds_between_rpcs
237    else
238      target_next_start += target_seconds_between_rpcs
239      sleep(sleep_seconds)
240    end
241    deadline = GRPC::Core::TimeConsts::from_relative_time(30) # 30 seconds
242    results = {}
243    $rpc_config.rpcs_to_send.each do |rpc|
244      # rpc is in the form of :UNARY_CALL or :EMPTY_CALL here
245      metadata = $rpc_config.metadata_to_send.key?(rpc) ?
246                   $rpc_config.metadata_to_send[rpc] : {}
247      $accumulated_stats_mu.synchronize do
248        $num_rpcs_started_by_method[rpc.to_s] += 1
249        num_started = $num_rpcs_started_by_method[rpc.to_s]
250        if num_started % 100 == 0
251          GRPC.logger.info("Started #{num_started} of #{rpc}")
252        end
253      end
254      if rpc == :UNARY_CALL
255        op = stub.unary_call(simple_req,
256                             metadata: metadata,
257                             deadline: deadline,
258                             return_op: true)
259      elsif rpc == :EMPTY_CALL
260        op = stub.empty_call(empty_req,
261                             metadata: metadata,
262                             deadline: deadline,
263                             return_op: true)
264      else
265        raise "Unsupported rpc #{rpc}"
266      end
267      rpc_stats_key = rpc.to_s
268      if metadata.key?('rpc-behavior') and
269        (metadata['rpc-behavior'] == 'keep-open')
270        num_open_threads = keep_open_threads.size
271        if num_open_threads % 50 == 0
272          GRPC.logger.info("number of keep_open_threads = #{num_open_threads}")
273        end
274        keep_open_threads << execute_rpc_in_thread(op, rpc_stats_key)
275      else
276        results[rpc] = execute_rpc(op, fail_on_failed_rpcs, rpc_stats_key)
277      end
278    end
279    $watchers_mutex.synchronize do
280      $watchers.each do |watcher|
281        # this is counted once when each group of all rpcs_to_send were done
282        watcher['rpcs_needed'] -= 1
283        results.each do |rpc_name, remote_peer|
284          # These stats expect rpc_name to be in the form of
285          # UnaryCall or EmptyCall, not the underscore-case all-caps form
286          rpc_name = $RPC_MAP.invert()[rpc_name]
287          if remote_peer.strip.empty?
288            # error is counted per individual RPC
289            watcher['no_remote_peer'] += 1
290          else
291            if not watcher['rpcs_by_method'].key?(rpc_name)
292              watcher['rpcs_by_method'][rpc_name] = Hash.new(0)
293            end
294            # increment the remote hostname distribution histogram
295            # both by overall, and broken down per RPC
296            watcher['rpcs_by_method'][rpc_name][remote_peer] +=  1
297            watcher['rpcs_by_peer'][remote_peer] += 1
298          end
299        end
300      end
301      $watchers_cv.broadcast
302    end
303  end
304  keep_open_threads.each { |thd| thd.join }
305end
306
307# Args is used to hold the command line info.
308Args = Struct.new(:fail_on_failed_rpcs, :num_channels,
309                  :rpc, :metadata,
310                  :server, :stats_port, :qps)
311
312# validates the command line options, returning them as a Hash.
313def parse_args
314  args = Args.new
315  args['fail_on_failed_rpcs'] = false
316  args['num_channels'] = 1
317  args['rpc'] = 'UnaryCall'
318  args['metadata'] = ''
319  OptionParser.new do |opts|
320    opts.on('--fail_on_failed_rpcs BOOL', ['false', 'true']) do |v|
321      args['fail_on_failed_rpcs'] = v == 'true'
322    end
323    opts.on('--num_channels CHANNELS', 'number of channels') do |v|
324      args['num_channels'] = v.to_i
325    end
326    opts.on('--rpc RPCS_TO_SEND', 'list of RPCs to send') do |v|
327      args['rpc'] = v
328    end
329    opts.on('--metadata METADATA_TO_SEND', 'metadata to send per RPC') do |v|
330      args['metadata'] = v
331    end
332    opts.on('--server SERVER_HOST', 'server hostname') do |v|
333      GRPC.logger.info("ruby xds: server address is #{v}")
334      args['server'] = v
335    end
336    opts.on('--stats_port STATS_PORT', 'stats port') do |v|
337      GRPC.logger.info("ruby xds: stats port is #{v}")
338      args['stats_port'] = v
339    end
340    opts.on('--qps QPS', 'qps') do |v|
341      GRPC.logger.info("ruby xds: qps is #{v}")
342      args['qps'] = v
343    end
344  end.parse!
345  args
346end
347
348def main
349  opts = parse_args
350
351  # This server hosts the LoadBalancerStatsService
352  host = "0.0.0.0:#{opts['stats_port']}"
353  s = GRPC::RpcServer.new
354  s.add_http2_port(host, :this_port_is_insecure)
355  s.handle(TestTarget)
356  s.handle(ConfigureTarget)
357  server_thread = Thread.new {
358    # run the server until the main test runner terminates this process
359    s.run_till_terminated_or_interrupted(['TERM'])
360  }
361
362  # Initialize stats
363  $RPC_MAP.values.each do |rpc|
364    $num_rpcs_started_by_method[rpc.to_s] = 0
365    $num_rpcs_succeeded_by_method[rpc.to_s] = 0
366    $num_rpcs_failed_by_method[rpc.to_s] = 0
367  end
368
369  # The client just sends rpcs continuously in a regular interval
370  stub = create_stub(opts)
371  target_seconds_between_rpcs = (1.0 / opts['qps'].to_f)
372  # Convert 'metadata' input in the form of
373  #   rpc1:k1:v1,rpc2:k2:v2,rpc1:k3:v3
374  # into
375  #   {
376  #     'rpc1' => {
377  #       'k1' => 'v1',
378  #       'k3' => 'v3',
379  #     },
380  #     'rpc2' => {
381  #       'k2' => 'v2'
382  #     },
383  #   }
384  rpcs_to_send = []
385  metadata_to_send = {}
386  if opts['metadata']
387    metadata_entries = opts['metadata'].split(',')
388    metadata_entries.each do |e|
389      (rpc_name, metadata_key, metadata_value) = e.split(':')
390      rpc_name = $RPC_MAP[rpc_name]
391      # initialize if we haven't seen this rpc_name yet
392      if !metadata_to_send.key?(rpc_name)
393        metadata_to_send[rpc_name] = {}
394      end
395      metadata_to_send[rpc_name][metadata_key] = metadata_value
396    end
397  end
398  if opts['rpc']
399    rpcs_to_send = opts['rpc'].split(',')
400  end
401  if rpcs_to_send.size > 0
402    rpcs_to_send.map! { |rpc| $RPC_MAP[rpc] }
403    new_rpc_config = RpcConfig.new
404    new_rpc_config.init(rpcs_to_send, metadata_to_send)
405    $rpc_config = new_rpc_config
406  end
407  client_threads = Array.new
408  opts['num_channels'].times {
409    client_threads << Thread.new {
410      run_test_loop(stub, target_seconds_between_rpcs,
411                    opts['fail_on_failed_rpcs'])
412    }
413  }
414
415  server_thread.join
416  $shutdown = true
417  client_threads.each { |thd| thd.join }
418end
419
420if __FILE__ == $0
421  main
422end
423