1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15require_relative '../grpc' 16require_relative 'active_call' 17require_relative 'service' 18require 'thread' 19 20# GRPC contains the General RPC module. 21module GRPC 22 # Pool is a simple thread pool. 23 class Pool 24 # Default keep alive period is 1s 25 DEFAULT_KEEP_ALIVE = 1 26 27 def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE) 28 fail 'pool size must be positive' unless size > 0 29 @jobs = Queue.new 30 @size = size 31 @stopped = false 32 @stop_mutex = Mutex.new # needs to be held when accessing @stopped 33 @stop_cond = ConditionVariable.new 34 @workers = [] 35 @keep_alive = keep_alive 36 37 # Each worker thread has its own queue to push and pull jobs 38 # these queues are put into @ready_queues when that worker is idle 39 @ready_workers = Queue.new 40 end 41 42 # Returns the number of jobs waiting 43 def jobs_waiting 44 @jobs.size 45 end 46 47 def ready_for_work? 48 # Busy worker threads are either doing work, or have a single job 49 # waiting on them. Workers that are idle with no jobs waiting 50 # have their "queues" in @ready_workers 51 !@ready_workers.empty? 52 end 53 54 # Runs the given block on the queue with the provided args. 55 # 56 # @param args the args passed blk when it is called 57 # @param blk the block to call 58 def schedule(*args, &blk) 59 return if blk.nil? 60 @stop_mutex.synchronize do 61 if @stopped 62 GRPC.logger.warn('did not schedule job, already stopped') 63 return 64 end 65 GRPC.logger.info('schedule another job') 66 fail 'No worker threads available' if @ready_workers.empty? 67 worker_queue = @ready_workers.pop 68 69 fail 'worker already has a task waiting' unless worker_queue.empty? 70 worker_queue << [blk, args] 71 end 72 end 73 74 # Starts running the jobs in the thread pool. 75 def start 76 @stop_mutex.synchronize do 77 fail 'already stopped' if @stopped 78 end 79 until @workers.size == @size.to_i 80 new_worker_queue = Queue.new 81 @ready_workers << new_worker_queue 82 next_thread = Thread.new(new_worker_queue) do |jobs| 83 catch(:exit) do # allows { throw :exit } to kill a thread 84 loop_execute_jobs(jobs) 85 end 86 remove_current_thread 87 end 88 @workers << next_thread 89 end 90 end 91 92 # Stops the jobs in the pool 93 def stop 94 GRPC.logger.info('stopping, will wait for all the workers to exit') 95 @stop_mutex.synchronize do # wait @keep_alive seconds for workers to stop 96 @stopped = true 97 loop do 98 break unless ready_for_work? 99 worker_queue = @ready_workers.pop 100 worker_queue << [proc { throw :exit }, []] 101 end 102 @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0 103 end 104 forcibly_stop_workers 105 GRPC.logger.info('stopped, all workers are shutdown') 106 end 107 108 protected 109 110 # Forcibly shutdown any threads that are still alive. 111 def forcibly_stop_workers 112 return unless @workers.size > 0 113 GRPC.logger.info("forcibly terminating #{@workers.size} worker(s)") 114 @workers.each do |t| 115 next unless t.alive? 116 begin 117 t.exit 118 rescue StandardError => e 119 GRPC.logger.warn('error while terminating a worker') 120 GRPC.logger.warn(e) 121 end 122 end 123 end 124 125 # removes the threads from workers, and signal when all the 126 # threads are complete. 127 def remove_current_thread 128 @stop_mutex.synchronize do 129 @workers.delete(Thread.current) 130 @stop_cond.signal if @workers.size.zero? 131 end 132 end 133 134 def loop_execute_jobs(worker_queue) 135 loop do 136 begin 137 blk, args = worker_queue.pop 138 blk.call(*args) 139 rescue StandardError, GRPC::Core::CallError => e 140 GRPC.logger.warn('Error in worker thread') 141 GRPC.logger.warn(e) 142 end 143 # there shouldn't be any work given to this thread while its busy 144 fail('received a task while busy') unless worker_queue.empty? 145 @stop_mutex.synchronize do 146 return if @stopped 147 @ready_workers << worker_queue 148 end 149 end 150 end 151 end 152 153 # RpcServer hosts a number of services and makes them available on the 154 # network. 155 class RpcServer 156 include Core::CallOps 157 include Core::TimeConsts 158 extend ::Forwardable 159 160 def_delegators :@server, :add_http2_port 161 162 # Default thread pool size is 30 163 DEFAULT_POOL_SIZE = 30 164 165 # Deprecated due to internal changes to the thread pool 166 DEFAULT_MAX_WAITING_REQUESTS = 20 167 168 # Default poll period is 1s 169 DEFAULT_POLL_PERIOD = 1 170 171 # Signal check period is 0.25s 172 SIGNAL_CHECK_PERIOD = 0.25 173 174 # setup_connect_md_proc is used by #initialize to validate the 175 # connect_md_proc. 176 def self.setup_connect_md_proc(a_proc) 177 return nil if a_proc.nil? 178 fail(TypeError, '!Proc') unless a_proc.is_a? Proc 179 a_proc 180 end 181 182 # Creates a new RpcServer. 183 # 184 # The RPC server is configured using keyword arguments. 185 # 186 # There are some specific keyword args used to configure the RpcServer 187 # instance. 188 # 189 # * pool_size: the size of the thread pool the server uses to run its 190 # threads. No more concurrent requests can be made than the size 191 # of the thread pool 192 # 193 # * max_waiting_requests: Deprecated due to internal changes to the thread 194 # pool. This is still an argument for compatibility but is ignored. 195 # 196 # * poll_period: The amount of time in seconds to wait for 197 # currently-serviced RPC's to finish before cancelling them when shutting 198 # down the server. 199 # 200 # * pool_keep_alive: The amount of time in seconds to wait 201 # for currently busy thread-pool threads to finish before 202 # forcing an abrupt exit to each thread. 203 # 204 # * connect_md_proc: 205 # when non-nil is a proc for determining metadata to to send back the client 206 # on receiving an invocation req. The proc signature is: 207 # {key: val, ..} func(method_name, {key: val, ...}) 208 # 209 # * server_args: 210 # A server arguments hash to be passed down to the underlying core server 211 # 212 # * interceptors: 213 # Am array of GRPC::ServerInterceptor objects that will be used for 214 # intercepting server handlers to provide extra functionality. 215 # Interceptors are an EXPERIMENTAL API. 216 # 217 def initialize(pool_size: DEFAULT_POOL_SIZE, 218 max_waiting_requests: DEFAULT_MAX_WAITING_REQUESTS, 219 poll_period: DEFAULT_POLL_PERIOD, 220 pool_keep_alive: GRPC::RpcServer::DEFAULT_POOL_SIZE, 221 connect_md_proc: nil, 222 server_args: {}, 223 interceptors: []) 224 @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc) 225 @max_waiting_requests = max_waiting_requests 226 @poll_period = poll_period 227 @pool_size = pool_size 228 @pool = Pool.new(@pool_size, keep_alive: pool_keep_alive) 229 @run_cond = ConditionVariable.new 230 @run_mutex = Mutex.new 231 # running_state can take 4 values: :not_started, :running, :stopping, and 232 # :stopped. State transitions can only proceed in that order. 233 @running_state = :not_started 234 @server = Core::Server.new(server_args) 235 @interceptors = InterceptorRegistry.new(interceptors) 236 end 237 238 # stops a running server 239 # 240 # the call has no impact if the server is already stopped, otherwise 241 # server's current call loop is it's last. 242 def stop 243 @run_mutex.synchronize do 244 fail 'Cannot stop before starting' if @running_state == :not_started 245 return if @running_state != :running 246 transition_running_state(:stopping) 247 deadline = from_relative_time(@poll_period) 248 @server.shutdown_and_notify(deadline) 249 end 250 @pool.stop 251 end 252 253 def running_state 254 @run_mutex.synchronize do 255 return @running_state 256 end 257 end 258 259 # Can only be called while holding @run_mutex 260 def transition_running_state(target_state) 261 state_transitions = { 262 not_started: :running, 263 running: :stopping, 264 stopping: :stopped 265 } 266 if state_transitions[@running_state] == target_state 267 @running_state = target_state 268 else 269 fail "Bad server state transition: #{@running_state}->#{target_state}" 270 end 271 end 272 273 def running? 274 running_state == :running 275 end 276 277 def stopped? 278 running_state == :stopped 279 end 280 281 # Is called from other threads to wait for #run to start up the server. 282 # 283 # If run has not been called, this returns immediately. 284 # 285 # @param timeout [Numeric] number of seconds to wait 286 # @return [true, false] true if the server is running, false otherwise 287 def wait_till_running(timeout = nil) 288 @run_mutex.synchronize do 289 @run_cond.wait(@run_mutex, timeout) if @running_state == :not_started 290 return @running_state == :running 291 end 292 end 293 294 # handle registration of classes 295 # 296 # service is either a class that includes GRPC::GenericService and whose 297 # #new function can be called without argument or any instance of such a 298 # class. 299 # 300 # E.g, after 301 # 302 # class Divider 303 # include GRPC::GenericService 304 # rpc :div DivArgs, DivReply # single request, single response 305 # def initialize(optional_arg='default option') # no args 306 # ... 307 # end 308 # 309 # srv = GRPC::RpcServer.new(...) 310 # 311 # # Either of these works 312 # 313 # srv.handle(Divider) 314 # 315 # # or 316 # 317 # srv.handle(Divider.new('replace optional arg')) 318 # 319 # It raises RuntimeError: 320 # - if service is not valid service class or object 321 # - its handler methods are already registered 322 # - if the server is already running 323 # 324 # @param service [Object|Class] a service class or object as described 325 # above 326 def handle(service) 327 @run_mutex.synchronize do 328 unless @running_state == :not_started 329 fail 'cannot add services if the server has been started' 330 end 331 cls = service.is_a?(Class) ? service : service.class 332 assert_valid_service_class(cls) 333 add_rpc_descs_for(service) 334 end 335 end 336 337 # runs the server 338 # 339 # - if no rpc_descs are registered, this exits immediately, otherwise it 340 # continues running permanently and does not return until program exit. 341 # 342 # - #running? returns true after this is called, until #stop cause the 343 # the server to stop. 344 def run 345 @run_mutex.synchronize do 346 fail 'cannot run without registering services' if rpc_descs.size.zero? 347 @pool.start 348 @server.start 349 transition_running_state(:running) 350 @run_cond.broadcast 351 end 352 loop_handle_server_calls 353 end 354 355 alias_method :run_till_terminated, :run 356 357 # Sends RESOURCE_EXHAUSTED if there are too many unprocessed jobs 358 def available?(an_rpc) 359 return an_rpc if @pool.ready_for_work? 360 GRPC.logger.warn('no free worker threads currently') 361 noop = proc { |x| x } 362 363 # Create a new active call that knows that metadata hasn't been 364 # sent yet 365 c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline, 366 metadata_received: true, started: false) 367 c.send_status(GRPC::Core::StatusCodes::RESOURCE_EXHAUSTED, 368 'No free threads in thread pool') 369 nil 370 end 371 372 # Sends UNIMPLEMENTED if the method is not implemented by this server 373 def implemented?(an_rpc) 374 mth = an_rpc.method.to_sym 375 return an_rpc if rpc_descs.key?(mth) 376 GRPC.logger.warn("UNIMPLEMENTED: #{an_rpc}") 377 noop = proc { |x| x } 378 379 # Create a new active call that knows that 380 # metadata hasn't been sent yet 381 c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline, 382 metadata_received: true, started: false) 383 c.send_status(GRPC::Core::StatusCodes::UNIMPLEMENTED, '') 384 nil 385 end 386 387 # handles calls to the server 388 def loop_handle_server_calls 389 fail 'not started' if running_state == :not_started 390 while running_state == :running 391 begin 392 an_rpc = @server.request_call 393 break if (!an_rpc.nil?) && an_rpc.call.nil? 394 active_call = new_active_server_call(an_rpc) 395 unless active_call.nil? 396 @pool.schedule(active_call) do |ac| 397 c, mth = ac 398 begin 399 rpc_descs[mth].run_server_method( 400 c, 401 rpc_handlers[mth], 402 @interceptors.build_context 403 ) 404 rescue StandardError 405 c.send_status(GRPC::Core::StatusCodes::INTERNAL, 406 'Server handler failed') 407 end 408 end 409 end 410 rescue Core::CallError, RuntimeError => e 411 # these might happen for various reasons. The correct behavior of 412 # the server is to log them and continue, if it's not shutting down. 413 if running_state == :running 414 GRPC.logger.warn("server call failed: #{e}") 415 end 416 next 417 end 418 end 419 # @running_state should be :stopping here 420 @run_mutex.synchronize do 421 transition_running_state(:stopped) 422 GRPC.logger.info("stopped: #{self}") 423 @server.close 424 end 425 end 426 427 def new_active_server_call(an_rpc) 428 return nil if an_rpc.nil? || an_rpc.call.nil? 429 430 # allow the metadata to be accessed from the call 431 an_rpc.call.metadata = an_rpc.metadata # attaches md to call for handlers 432 connect_md = nil 433 unless @connect_md_proc.nil? 434 connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata) 435 end 436 437 return nil unless available?(an_rpc) 438 return nil unless implemented?(an_rpc) 439 440 # Create the ActiveCall. Indicate that metadata hasnt been sent yet. 441 GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})") 442 rpc_desc = rpc_descs[an_rpc.method.to_sym] 443 c = ActiveCall.new(an_rpc.call, 444 rpc_desc.marshal_proc, 445 rpc_desc.unmarshal_proc(:input), 446 an_rpc.deadline, 447 metadata_received: true, 448 started: false, 449 metadata_to_send: connect_md) 450 c.attach_peer_cert(an_rpc.call.peer_cert) 451 mth = an_rpc.method.to_sym 452 [c, mth] 453 end 454 455 protected 456 457 def rpc_descs 458 @rpc_descs ||= {} 459 end 460 461 def rpc_handlers 462 @rpc_handlers ||= {} 463 end 464 465 def assert_valid_service_class(cls) 466 unless cls.include?(GenericService) 467 fail "#{cls} must 'include GenericService'" 468 end 469 fail "#{cls} should specify some rpc descriptions" if 470 cls.rpc_descs.size.zero? 471 end 472 473 # This should be called while holding @run_mutex 474 def add_rpc_descs_for(service) 475 cls = service.is_a?(Class) ? service : service.class 476 specs, handlers = (@rpc_descs ||= {}), (@rpc_handlers ||= {}) 477 cls.rpc_descs.each_pair do |name, spec| 478 route = "/#{cls.service_name}/#{name}".to_sym 479 fail "already registered: rpc #{route} from #{spec}" if specs.key? route 480 specs[route] = spec 481 rpc_name = GenericService.underscore(name.to_s).to_sym 482 if service.is_a?(Class) 483 handlers[route] = cls.new.method(rpc_name) 484 else 485 handlers[route] = service.method(rpc_name) 486 end 487 GRPC.logger.info("handling #{route} with #{handlers[route]}") 488 end 489 end 490 end 491end 492