1#!/usr/bin/env ruby 2 3# Copyright 2016 gRPC authors. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17# Worker and worker service implementation 18 19this_dir = File.expand_path(File.dirname(__FILE__)) 20lib_dir = File.join(File.dirname(this_dir), 'lib') 21$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir) 22$LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir) 23 24require 'grpc' 25require 'optparse' 26require 'histogram' 27require 'etc' 28require 'facter' 29require 'client' 30require 'qps-common' 31require 'server' 32require 'src/proto/grpc/testing/worker_service_services_pb' 33 34class WorkerServiceImpl < Grpc::Testing::WorkerService::Service 35 def cpu_cores 36 Facter.value('processors')['count'] 37 end 38 def run_server(reqs) 39 q = EnumeratorQueue.new(self) 40 Thread.new { 41 bms = '' 42 gtss = Grpc::Testing::ServerStatus 43 reqs.each do |req| 44 case req.argtype.to_s 45 when 'setup' 46 bms = BenchmarkServer.new(req.setup, @server_port) 47 q.push(gtss.new(stats: bms.mark(false), port: bms.get_port)) 48 when 'mark' 49 q.push(gtss.new(stats: bms.mark(req.mark.reset), cores: cpu_cores)) 50 end 51 end 52 bms.stop 53 q.push(self) 54 } 55 q.each_item 56 end 57 def run_client(reqs) 58 q = EnumeratorQueue.new(self) 59 Thread.new { 60 client = '' 61 reqs.each do |req| 62 case req.argtype.to_s 63 when 'setup' 64 client = BenchmarkClient.new(req.setup) 65 q.push(Grpc::Testing::ClientStatus.new(stats: client.mark(false))) 66 when 'mark' 67 q.push(Grpc::Testing::ClientStatus.new(stats: 68 client.mark(req.mark.reset))) 69 end 70 end 71 client.shutdown 72 q.push(self) 73 } 74 q.each_item 75 end 76 def core_count(_args, _call) 77 Grpc::Testing::CoreResponse.new(cores: cpu_cores) 78 end 79 def quit_worker(_args, _call) 80 @shutdown_thread = Thread.new { 81 @server.stop 82 } 83 Grpc::Testing::Void.new 84 end 85 def initialize(s, sp) 86 @server = s 87 @server_port = sp 88 end 89 def join_shutdown_thread 90 @shutdown_thread.join 91 end 92end 93 94def main 95 options = { 96 'driver_port' => 0, 97 'server_port' => 0 98 } 99 OptionParser.new do |opts| 100 opts.banner = 'Usage: [--driver_port <port>] [--server_port <port>]' 101 opts.on('--driver_port PORT', '<port>') do |v| 102 options['driver_port'] = v 103 end 104 opts.on('--server_port PORT', '<port>') do |v| 105 options['server_port'] = v 106 end 107 end.parse! 108 109 # Configure any errors with client or server child threads to surface 110 Thread.abort_on_exception = true 111 112 s = GRPC::RpcServer.new(poll_period: 3) 113 s.add_http2_port("0.0.0.0:" + options['driver_port'].to_s, 114 :this_port_is_insecure) 115 worker_service = WorkerServiceImpl.new(s, options['server_port'].to_i) 116 s.handle(worker_service) 117 s.run 118 worker_service.join_shutdown_thread 119end 120 121main 122