• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env ruby
2
3# Copyright 2016 gRPC authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17# Worker and worker service implementation
18
19this_dir = File.expand_path(File.dirname(__FILE__))
20lib_dir = File.join(File.dirname(this_dir), 'lib')
21$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)
22$LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir)
23
24require 'grpc'
25require 'histogram'
26require 'src/proto/grpc/testing/benchmark_service_services_pb'
27
28class Poisson
29  def interarrival
30    @lambda_recip * (-Math.log(1.0-rand))
31  end
32  def advance
33    t = @next_time
34    @next_time += interarrival
35    t
36  end
37  def initialize(lambda)
38    @lambda_recip = 1.0/lambda
39    @next_time = Time.now + interarrival
40  end
41end
42
43class BenchmarkClient
44  def initialize(config)
45    opts = {}
46    if config.security_params
47      if config.security_params.use_test_ca
48        certs = load_test_certs
49        cred = GRPC::Core::ChannelCredentials.new(certs[0])
50      else
51        cred = GRPC::Core::ChannelCredentials.new()
52      end
53      if config.security_params.server_host_override
54        channel_args = {}
55        channel_args[GRPC::Core::Channel::SSL_TARGET] =
56          config.security_params.server_host_override
57        opts[:channel_args] = channel_args
58      end
59    else
60      cred = :this_channel_is_insecure
61    end
62    @histres = config.histogram_params.resolution
63    @histmax = config.histogram_params.max_possible
64    @start_time = Time.now
65    @histogram = Histogram.new(@histres, @histmax)
66    @done = false
67
68    gtsr = Grpc::Testing::SimpleRequest
69    gtpt = Grpc::Testing::PayloadType
70    gtp = Grpc::Testing::Payload
71    simple_params = config.payload_config.simple_params
72    req = gtsr.new(response_type: gtpt::COMPRESSABLE,
73                   response_size: simple_params.resp_size,
74                   payload: gtp.new(type: gtpt::COMPRESSABLE,
75                                    body: nulls(simple_params.req_size)))
76
77    @child_threads = []
78
79    (0..config.client_channels-1).each do |chan|
80      gtbss = Grpc::Testing::BenchmarkService::Stub
81      st = config.server_targets
82      stub = gtbss.new(st[chan % st.length], cred, **opts)
83      (0..config.outstanding_rpcs_per_channel-1).each do |r|
84        @child_threads << Thread.new {
85          case config.load_params.load.to_s
86          when 'closed_loop'
87            waiter = nil
88          when 'poisson'
89            waiter = Poisson.new(config.load_params.poisson.offered_load /
90                                 (config.client_channels *
91                                  config.outstanding_rpcs_per_channel))
92          end
93          case config.rpc_type
94          when :UNARY
95            unary_ping_ponger(req,stub,config,waiter)
96          when :STREAMING
97            streaming_ping_ponger(req,stub,config,waiter)
98          end
99        }
100      end
101    end
102  end
103  def wait_to_issue(waiter)
104    if waiter
105      delay = waiter.advance-Time.now
106      sleep delay if delay > 0
107    end
108  end
109  def unary_ping_ponger(req, stub, config,waiter)
110    while !@done
111      wait_to_issue(waiter)
112      start = Time.now
113      resp = stub.unary_call(req)
114      @histogram.add((Time.now-start)*1e9)
115    end
116  end
117  def streaming_ping_ponger(req, stub, config, waiter)
118    q = EnumeratorQueue.new(self)
119    resp = stub.streaming_call(q.each_item)
120    start = Time.now
121    q.push(req)
122    pushed_sentinal = false
123    resp.each do |r|
124      @histogram.add((Time.now-start)*1e9)
125      if !@done
126        wait_to_issue(waiter)
127        start = Time.now
128        q.push(req)
129      else
130        q.push(self) unless pushed_sentinal
131	# Continue polling on the responses to consume and release resources
132        pushed_sentinal = true
133      end
134    end
135  end
136  def mark(reset)
137    lat = Grpc::Testing::HistogramData.new(
138      bucket: @histogram.contents,
139      min_seen: @histogram.minimum,
140      max_seen: @histogram.maximum,
141      sum: @histogram.sum,
142      sum_of_squares: @histogram.sum_of_squares,
143      count: @histogram.count
144    )
145    elapsed = Time.now-@start_time
146    if reset
147      @start_time = Time.now
148      @histogram = Histogram.new(@histres, @histmax)
149    end
150    Grpc::Testing::ClientStats.new(latencies: lat, time_elapsed: elapsed)
151  end
152  def shutdown
153    @done = true
154    @child_threads.each do |thread|
155      thread.join
156    end
157  end
158end
159