1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15require 'forwardable' 16require 'weakref' 17require_relative 'bidi_call' 18 19class Struct 20 # BatchResult is the struct returned by calls to call#start_batch. 21 class BatchResult 22 # check_status returns the status, raising an error if the status 23 # is non-nil and not OK. 24 def check_status 25 return nil if status.nil? 26 if status.code != GRPC::Core::StatusCodes::OK 27 GRPC.logger.debug("Failing with status #{status}") 28 # raise BadStatus, propagating the metadata if present. 29 fail GRPC::BadStatus.new_status_exception( 30 status.code, status.details, status.metadata, 31 status.debug_error_string) 32 end 33 status 34 end 35 end 36end 37 38# GRPC contains the General RPC module. 39module GRPC 40 # The ActiveCall class provides simple methods for sending marshallable 41 # data to a call 42 class ActiveCall # rubocop:disable Metrics/ClassLength 43 include Core::TimeConsts 44 include Core::CallOps 45 extend Forwardable 46 attr_reader :deadline, :metadata_sent, :metadata_to_send, :peer, :peer_cert 47 def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=, 48 :trailing_metadata, :status 49 50 # client_invoke begins a client invocation. 51 # 52 # Flow Control note: this blocks until flow control accepts that client 53 # request can go ahead. 54 # 55 # deadline is the absolute deadline for the call. 56 # 57 # == Keyword Arguments == 58 # any keyword arguments are treated as metadata to be sent to the server 59 # if a keyword value is a list, multiple metadata for it's key are sent 60 # 61 # @param call [Call] a call on which to start and invocation 62 # @param metadata [Hash] the metadata 63 def self.client_invoke(call, metadata = {}) 64 fail(TypeError, '!Core::Call') unless call.is_a? Core::Call 65 call.run_batch(SEND_INITIAL_METADATA => metadata) 66 end 67 68 # Creates an ActiveCall. 69 # 70 # ActiveCall should only be created after a call is accepted. That 71 # means different things on a client and a server. On the client, the 72 # call is accepted after calling call.invoke. On the server, this is 73 # after call.accept. 74 # 75 # #initialize cannot determine if the call is accepted or not; so if a 76 # call that's not accepted is used here, the error won't be visible until 77 # the ActiveCall methods are called. 78 # 79 # deadline is the absolute deadline for the call. 80 # 81 # @param call [Call] the call used by the ActiveCall 82 # @param marshal [Function] f(obj)->string that marshal requests 83 # @param unmarshal [Function] f(string)->obj that unmarshals responses 84 # @param deadline [Fixnum] the deadline for the call to complete 85 # @param started [true|false] indicates that metadata was sent 86 # @param metadata_received [true|false] indicates if metadata has already 87 # been received. Should always be true for server calls 88 def initialize(call, marshal, unmarshal, deadline, started: true, 89 metadata_received: false, metadata_to_send: nil) 90 fail(TypeError, '!Core::Call') unless call.is_a? Core::Call 91 @call = call 92 @deadline = deadline 93 @marshal = marshal 94 @unmarshal = unmarshal 95 @metadata_received = metadata_received 96 @metadata_sent = started 97 @op_notifier = nil 98 99 fail(ArgumentError, 'Already sent md') if started && metadata_to_send 100 @metadata_to_send = metadata_to_send || {} unless started 101 @send_initial_md_mutex = Mutex.new 102 103 @output_stream_done = false 104 @input_stream_done = false 105 @call_finished = false 106 @call_finished_mu = Mutex.new 107 108 @client_call_executed = false 109 @client_call_executed_mu = Mutex.new 110 111 # set the peer now so that the accessor can still function 112 # after the server closes the call 113 @peer = call.peer 114 end 115 116 # Sends the initial metadata that has yet to be sent. 117 # Does nothing if metadata has already been sent for this call. 118 def send_initial_metadata(new_metadata = {}) 119 @send_initial_md_mutex.synchronize do 120 return if @metadata_sent 121 @metadata_to_send.merge!(new_metadata) 122 ActiveCall.client_invoke(@call, @metadata_to_send) 123 @metadata_sent = true 124 end 125 end 126 127 # output_metadata are provides access to hash that can be used to 128 # save metadata to be sent as trailer 129 def output_metadata 130 @output_metadata ||= {} 131 end 132 133 # cancelled indicates if the call was cancelled 134 def cancelled? 135 !@call.status.nil? && @call.status.code == Core::StatusCodes::CANCELLED 136 end 137 138 # multi_req_view provides a restricted view of this ActiveCall for use 139 # in a server client-streaming handler. 140 def multi_req_view 141 MultiReqView.new(self) 142 end 143 144 # single_req_view provides a restricted view of this ActiveCall for use in 145 # a server request-response handler. 146 def single_req_view 147 SingleReqView.new(self) 148 end 149 150 # operation provides a restricted view of this ActiveCall for use as 151 # a Operation. 152 def operation 153 @op_notifier = Notifier.new 154 Operation.new(self) 155 end 156 157 ## 158 # Returns a restricted view of this ActiveCall for use in interceptors 159 # 160 # @return [InterceptableView] 161 # 162 def interceptable 163 InterceptableView.new(self) 164 end 165 166 def receive_and_check_status 167 batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil) 168 set_input_stream_done 169 attach_status_results_and_complete_call(batch_result) 170 end 171 172 def attach_status_results_and_complete_call(recv_status_batch_result) 173 unless recv_status_batch_result.status.nil? 174 @call.trailing_metadata = recv_status_batch_result.status.metadata 175 end 176 @call.status = recv_status_batch_result.status 177 178 # The RECV_STATUS in run_batch always succeeds 179 # Check the status for a bad status or failed run batch 180 recv_status_batch_result.check_status 181 end 182 183 # remote_send sends a request to the remote endpoint. 184 # 185 # It blocks until the remote endpoint accepts the message. 186 # 187 # @param req [Object, String] the object to send or it's marshal form. 188 # @param marshalled [false, true] indicates if the object is already 189 # marshalled. 190 def remote_send(req, marshalled = false) 191 send_initial_metadata 192 GRPC.logger.debug("sending #{req}, marshalled? #{marshalled}") 193 payload = marshalled ? req : @marshal.call(req) 194 @call.run_batch(SEND_MESSAGE => payload) 195 end 196 197 # send_status sends a status to the remote endpoint. 198 # 199 # @param code [int] the status code to send 200 # @param details [String] details 201 # @param assert_finished [true, false] when true(default), waits for 202 # FINISHED. 203 # @param metadata [Hash] metadata to send to the server. If a value is a 204 # list, mulitple metadata for its key are sent 205 def send_status(code = OK, details = '', assert_finished = false, 206 metadata: {}) 207 send_initial_metadata 208 ops = { 209 SEND_STATUS_FROM_SERVER => Struct::Status.new(code, details, metadata) 210 } 211 ops[RECV_CLOSE_ON_SERVER] = nil if assert_finished 212 @call.run_batch(ops) 213 set_output_stream_done 214 215 nil 216 end 217 218 # Intended for use on server-side calls when a single request from 219 # the client is expected (i.e., unary and server-streaming RPC types). 220 def read_unary_request 221 req = remote_read 222 set_input_stream_done 223 req 224 end 225 226 def server_unary_response(req, trailing_metadata: {}, 227 code: Core::StatusCodes::OK, details: 'OK') 228 ops = {} 229 @send_initial_md_mutex.synchronize do 230 ops[SEND_INITIAL_METADATA] = @metadata_to_send unless @metadata_sent 231 @metadata_sent = true 232 end 233 234 payload = @marshal.call(req) 235 ops[SEND_MESSAGE] = payload 236 ops[SEND_STATUS_FROM_SERVER] = Struct::Status.new( 237 code, details, trailing_metadata) 238 ops[RECV_CLOSE_ON_SERVER] = nil 239 240 @call.run_batch(ops) 241 set_output_stream_done 242 end 243 244 # remote_read reads a response from the remote endpoint. 245 # 246 # It blocks until the remote endpoint replies with a message or status. 247 # On receiving a message, it returns the response after unmarshalling it. 248 # On receiving a status, it returns nil if the status is OK, otherwise 249 # raising BadStatus 250 def remote_read 251 ops = { RECV_MESSAGE => nil } 252 ops[RECV_INITIAL_METADATA] = nil unless @metadata_received 253 batch_result = @call.run_batch(ops) 254 unless @metadata_received 255 @call.metadata = batch_result.metadata 256 @metadata_received = true 257 end 258 get_message_from_batch_result(batch_result) 259 end 260 261 def get_message_from_batch_result(recv_message_batch_result) 262 unless recv_message_batch_result.nil? || 263 recv_message_batch_result.message.nil? 264 return @unmarshal.call(recv_message_batch_result.message) 265 end 266 GRPC.logger.debug('found nil; the final response has been sent') 267 nil 268 end 269 270 # each_remote_read passes each response to the given block or returns an 271 # enumerator the responses if no block is given. 272 # Used to generate the request enumerable for 273 # server-side client-streaming RPC's. 274 # 275 # == Enumerator == 276 # 277 # * #next blocks until the remote endpoint sends a READ or FINISHED 278 # * for each read, enumerator#next yields the response 279 # * on status 280 # * if it's is OK, enumerator#next raises StopException 281 # * if is not OK, enumerator#next raises RuntimeException 282 # 283 # == Block == 284 # 285 # * if provided it is executed for each response 286 # * the call blocks until no more responses are provided 287 # 288 # @return [Enumerator] if no block was given 289 def each_remote_read 290 return enum_for(:each_remote_read) unless block_given? 291 begin 292 loop do 293 resp = remote_read 294 break if resp.nil? # the last response was received 295 yield resp 296 end 297 ensure 298 set_input_stream_done 299 end 300 end 301 302 # each_remote_read_then_finish passes each response to the given block or 303 # returns an enumerator of the responses if no block is given. 304 # 305 # It is like each_remote_read, but it blocks on finishing on detecting 306 # the final message. 307 # 308 # == Enumerator == 309 # 310 # * #next blocks until the remote endpoint sends a READ or FINISHED 311 # * for each read, enumerator#next yields the response 312 # * on status 313 # * if it's is OK, enumerator#next raises StopException 314 # * if is not OK, enumerator#next raises RuntimeException 315 # 316 # == Block == 317 # 318 # * if provided it is executed for each response 319 # * the call blocks until no more responses are provided 320 # 321 # @return [Enumerator] if no block was given 322 def each_remote_read_then_finish 323 return enum_for(:each_remote_read_then_finish) unless block_given? 324 loop do 325 resp = 326 begin 327 remote_read 328 rescue GRPC::Core::CallError => e 329 GRPC.logger.warn("In each_remote_read_then_finish: #{e}") 330 nil 331 end 332 333 break if resp.nil? # the last response was received 334 yield resp 335 end 336 337 receive_and_check_status 338 ensure 339 set_input_stream_done 340 end 341 342 # request_response sends a request to a GRPC server, and returns the 343 # response. 344 # 345 # @param req [Object] the request sent to the server 346 # @param metadata [Hash] metadata to be sent to the server. If a value is 347 # a list, multiple metadata for its key are sent 348 # @return [Object] the response received from the server 349 def request_response(req, metadata: {}) 350 raise_error_if_already_executed 351 ops = { 352 SEND_MESSAGE => @marshal.call(req), 353 SEND_CLOSE_FROM_CLIENT => nil, 354 RECV_INITIAL_METADATA => nil, 355 RECV_MESSAGE => nil, 356 RECV_STATUS_ON_CLIENT => nil 357 } 358 @send_initial_md_mutex.synchronize do 359 # Metadata might have already been sent if this is an operation view 360 unless @metadata_sent 361 ops[SEND_INITIAL_METADATA] = @metadata_to_send.merge!(metadata) 362 end 363 @metadata_sent = true 364 end 365 366 begin 367 batch_result = @call.run_batch(ops) 368 # no need to check for cancellation after a CallError because this 369 # batch contains a RECV_STATUS op 370 ensure 371 set_input_stream_done 372 set_output_stream_done 373 end 374 375 @call.metadata = batch_result.metadata 376 attach_status_results_and_complete_call(batch_result) 377 get_message_from_batch_result(batch_result) 378 end 379 380 # client_streamer sends a stream of requests to a GRPC server, and 381 # returns a single response. 382 # 383 # requests provides an 'iterable' of Requests. I.e. it follows Ruby's 384 # #each enumeration protocol. In the simplest case, requests will be an 385 # array of marshallable objects; in typical case it will be an Enumerable 386 # that allows dynamic construction of the marshallable objects. 387 # 388 # @param requests [Object] an Enumerable of requests to send 389 # @param metadata [Hash] metadata to be sent to the server. If a value is 390 # a list, multiple metadata for its key are sent 391 # @return [Object] the response received from the server 392 def client_streamer(requests, metadata: {}) 393 raise_error_if_already_executed 394 begin 395 send_initial_metadata(metadata) 396 requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) } 397 rescue GRPC::Core::CallError => e 398 receive_and_check_status # check for Cancelled 399 raise e 400 rescue => e 401 set_input_stream_done 402 raise e 403 ensure 404 set_output_stream_done 405 end 406 407 batch_result = @call.run_batch( 408 SEND_CLOSE_FROM_CLIENT => nil, 409 RECV_INITIAL_METADATA => nil, 410 RECV_MESSAGE => nil, 411 RECV_STATUS_ON_CLIENT => nil 412 ) 413 414 set_input_stream_done 415 416 @call.metadata = batch_result.metadata 417 attach_status_results_and_complete_call(batch_result) 418 get_message_from_batch_result(batch_result) 419 end 420 421 # server_streamer sends one request to the GRPC server, which yields a 422 # stream of responses. 423 # 424 # responses provides an enumerator over the streamed responses, i.e. it 425 # follows Ruby's #each iteration protocol. The enumerator blocks while 426 # waiting for each response, stops when the server signals that no 427 # further responses will be supplied. If the implicit block is provided, 428 # it is executed with each response as the argument and no result is 429 # returned. 430 # 431 # @param req [Object] the request sent to the server 432 # @param metadata [Hash] metadata to be sent to the server. If a value is 433 # a list, multiple metadata for its key are sent 434 # @return [Enumerator|nil] a response Enumerator 435 def server_streamer(req, metadata: {}) 436 raise_error_if_already_executed 437 ops = { 438 SEND_MESSAGE => @marshal.call(req), 439 SEND_CLOSE_FROM_CLIENT => nil 440 } 441 @send_initial_md_mutex.synchronize do 442 # Metadata might have already been sent if this is an operation view 443 unless @metadata_sent 444 ops[SEND_INITIAL_METADATA] = @metadata_to_send.merge!(metadata) 445 end 446 @metadata_sent = true 447 end 448 449 begin 450 @call.run_batch(ops) 451 rescue GRPC::Core::CallError => e 452 receive_and_check_status # checks for Cancelled 453 raise e 454 rescue => e 455 set_input_stream_done 456 raise e 457 ensure 458 set_output_stream_done 459 end 460 461 replies = enum_for(:each_remote_read_then_finish) 462 return replies unless block_given? 463 replies.each { |r| yield r } 464 end 465 466 # bidi_streamer sends a stream of requests to the GRPC server, and yields 467 # a stream of responses. 468 # 469 # This method takes an Enumerable of requests, and returns and enumerable 470 # of responses. 471 # 472 # == requests == 473 # 474 # requests provides an 'iterable' of Requests. I.e. it follows Ruby's 475 # #each enumeration protocol. In the simplest case, requests will be an 476 # array of marshallable objects; in typical case it will be an 477 # Enumerable that allows dynamic construction of the marshallable 478 # objects. 479 # 480 # == responses == 481 # 482 # This is an enumerator of responses. I.e, its #next method blocks 483 # waiting for the next response. Also, if at any point the block needs 484 # to consume all the remaining responses, this can be done using #each or 485 # #collect. Calling #each or #collect should only be done if 486 # the_call#writes_done has been called, otherwise the block will loop 487 # forever. 488 # 489 # @param requests [Object] an Enumerable of requests to send 490 # @param metadata [Hash] metadata to be sent to the server. If a value is 491 # a list, multiple metadata for its key are sent 492 # @return [Enumerator, nil] a response Enumerator 493 def bidi_streamer(requests, metadata: {}, &blk) 494 raise_error_if_already_executed 495 # Metadata might have already been sent if this is an operation view 496 begin 497 send_initial_metadata(metadata) 498 rescue GRPC::Core::CallError => e 499 batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil) 500 set_input_stream_done 501 set_output_stream_done 502 attach_status_results_and_complete_call(batch_result) 503 raise e 504 rescue => e 505 set_input_stream_done 506 set_output_stream_done 507 raise e 508 end 509 510 bd = BidiCall.new(@call, 511 @marshal, 512 @unmarshal, 513 metadata_received: @metadata_received) 514 515 bd.run_on_client(requests, 516 proc { set_input_stream_done }, 517 proc { set_output_stream_done }, 518 &blk) 519 end 520 521 # run_server_bidi orchestrates a BiDi stream processing on a server. 522 # 523 # N.B. gen_each_reply is a func(Enumerable<Requests>) 524 # 525 # It takes an enumerable of requests as an arg, in case there is a 526 # relationship between the stream of requests and the stream of replies. 527 # 528 # This does not mean that must necessarily be one. E.g, the replies 529 # produced by gen_each_reply could ignore the received_msgs 530 # 531 # @param mth [Proc] generates the BiDi stream replies 532 # @param interception_ctx [InterceptionContext] 533 # 534 def run_server_bidi(mth, interception_ctx) 535 view = multi_req_view 536 bidi_call = BidiCall.new( 537 @call, 538 @marshal, 539 @unmarshal, 540 metadata_received: @metadata_received, 541 req_view: view 542 ) 543 requests = bidi_call.read_next_loop(proc { set_input_stream_done }, false) 544 interception_ctx.intercept!( 545 :bidi_streamer, 546 call: view, 547 method: mth, 548 requests: requests 549 ) do 550 bidi_call.run_on_server(mth, requests) 551 end 552 end 553 554 # Waits till an operation completes 555 def wait 556 return if @op_notifier.nil? 557 GRPC.logger.debug("active_call.wait: on #{@op_notifier}") 558 @op_notifier.wait 559 end 560 561 # Signals that an operation is done. 562 # Only relevant on the client-side (this is a no-op on the server-side) 563 def op_is_done 564 return if @op_notifier.nil? 565 @op_notifier.notify(self) 566 end 567 568 # Add to the metadata that will be sent from the server. 569 # Fails if metadata has already been sent. 570 # Unused by client calls. 571 def merge_metadata_to_send(new_metadata = {}) 572 @send_initial_md_mutex.synchronize do 573 fail('cant change metadata after already sent') if @metadata_sent 574 @metadata_to_send.merge!(new_metadata) 575 end 576 end 577 578 def attach_peer_cert(peer_cert) 579 @peer_cert = peer_cert 580 end 581 582 private 583 584 # To be called once the "input stream" has been completelly 585 # read through (i.e, done reading from client or received status) 586 # note this is idempotent 587 def set_input_stream_done 588 @call_finished_mu.synchronize do 589 @input_stream_done = true 590 maybe_finish_and_close_call_locked 591 end 592 end 593 594 # To be called once the "output stream" has been completelly 595 # sent through (i.e, done sending from client or sent status) 596 # note this is idempotent 597 def set_output_stream_done 598 @call_finished_mu.synchronize do 599 @output_stream_done = true 600 maybe_finish_and_close_call_locked 601 end 602 end 603 604 def maybe_finish_and_close_call_locked 605 return unless @output_stream_done && @input_stream_done 606 return if @call_finished 607 @call_finished = true 608 op_is_done 609 @call.close 610 end 611 612 # Starts the call if not already started 613 # @param metadata [Hash] metadata to be sent to the server. If a value is 614 # a list, multiple metadata for its key are sent 615 def start_call(metadata = {}) 616 merge_metadata_to_send(metadata) && send_initial_metadata 617 end 618 619 def raise_error_if_already_executed 620 @client_call_executed_mu.synchronize do 621 if @client_call_executed 622 fail GRPC::Core::CallError, 'attempting to re-run a call' 623 end 624 @client_call_executed = true 625 end 626 end 627 628 def self.view_class(*visible_methods) 629 Class.new do 630 extend ::Forwardable 631 def_delegators :@wrapped, *visible_methods 632 633 # @param wrapped [ActiveCall] the call whose methods are shielded 634 def initialize(wrapped) 635 @wrapped = wrapped 636 end 637 end 638 end 639 640 # SingleReqView limits access to an ActiveCall's methods for use in server 641 # handlers that receive just one request. 642 SingleReqView = view_class(:cancelled?, :deadline, :metadata, 643 :output_metadata, :peer, :peer_cert, 644 :send_initial_metadata, 645 :metadata_to_send, 646 :merge_metadata_to_send, 647 :metadata_sent) 648 649 # MultiReqView limits access to an ActiveCall's methods for use in 650 # server client_streamer handlers. 651 MultiReqView = view_class(:cancelled?, :deadline, 652 :each_remote_read, :metadata, :output_metadata, 653 :peer, :peer_cert, 654 :send_initial_metadata, 655 :metadata_to_send, 656 :merge_metadata_to_send, 657 :metadata_sent) 658 659 # Operation limits access to an ActiveCall's methods for use as 660 # a Operation on the client. 661 Operation = view_class(:cancel, :cancelled?, :deadline, :execute, 662 :metadata, :status, :start_call, :wait, :write_flag, 663 :write_flag=, :trailing_metadata) 664 665 # InterceptableView further limits access to an ActiveCall's methods 666 # for use in interceptors on the client, exposing only the deadline 667 InterceptableView = view_class(:deadline) 668 end 669end 670