1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15require 'forwardable' 16require 'weakref' 17require_relative 'bidi_call' 18 19class Struct 20 # BatchResult is the struct returned by calls to call#start_batch. 21 class BatchResult 22 # check_status returns the status, raising an error if the status 23 # is non-nil and not OK. 24 def check_status 25 return nil if status.nil? 26 fail GRPC::Cancelled if status.code == GRPC::Core::StatusCodes::CANCELLED 27 if status.code != GRPC::Core::StatusCodes::OK 28 GRPC.logger.debug("Failing with status #{status}") 29 # raise BadStatus, propagating the metadata if present. 30 md = status.metadata 31 fail GRPC::BadStatus.new_status_exception( 32 status.code, status.details, md) 33 end 34 status 35 end 36 end 37end 38 39# GRPC contains the General RPC module. 40module GRPC 41 # The ActiveCall class provides simple methods for sending marshallable 42 # data to a call 43 class ActiveCall # rubocop:disable Metrics/ClassLength 44 include Core::TimeConsts 45 include Core::CallOps 46 extend Forwardable 47 attr_reader :deadline, :metadata_sent, :metadata_to_send, :peer, :peer_cert 48 def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=, 49 :trailing_metadata, :status 50 51 # client_invoke begins a client invocation. 52 # 53 # Flow Control note: this blocks until flow control accepts that client 54 # request can go ahead. 55 # 56 # deadline is the absolute deadline for the call. 57 # 58 # == Keyword Arguments == 59 # any keyword arguments are treated as metadata to be sent to the server 60 # if a keyword value is a list, multiple metadata for it's key are sent 61 # 62 # @param call [Call] a call on which to start and invocation 63 # @param metadata [Hash] the metadata 64 def self.client_invoke(call, metadata = {}) 65 fail(TypeError, '!Core::Call') unless call.is_a? Core::Call 66 call.run_batch(SEND_INITIAL_METADATA => metadata) 67 end 68 69 # Creates an ActiveCall. 70 # 71 # ActiveCall should only be created after a call is accepted. That 72 # means different things on a client and a server. On the client, the 73 # call is accepted after calling call.invoke. On the server, this is 74 # after call.accept. 75 # 76 # #initialize cannot determine if the call is accepted or not; so if a 77 # call that's not accepted is used here, the error won't be visible until 78 # the ActiveCall methods are called. 79 # 80 # deadline is the absolute deadline for the call. 81 # 82 # @param call [Call] the call used by the ActiveCall 83 # @param marshal [Function] f(obj)->string that marshal requests 84 # @param unmarshal [Function] f(string)->obj that unmarshals responses 85 # @param deadline [Fixnum] the deadline for the call to complete 86 # @param started [true|false] indicates that metadata was sent 87 # @param metadata_received [true|false] indicates if metadata has already 88 # been received. Should always be true for server calls 89 def initialize(call, marshal, unmarshal, deadline, started: true, 90 metadata_received: false, metadata_to_send: nil) 91 fail(TypeError, '!Core::Call') unless call.is_a? Core::Call 92 @call = call 93 @deadline = deadline 94 @marshal = marshal 95 @unmarshal = unmarshal 96 @metadata_received = metadata_received 97 @metadata_sent = started 98 @op_notifier = nil 99 100 fail(ArgumentError, 'Already sent md') if started && metadata_to_send 101 @metadata_to_send = metadata_to_send || {} unless started 102 @send_initial_md_mutex = Mutex.new 103 104 @output_stream_done = false 105 @input_stream_done = false 106 @call_finished = false 107 @call_finished_mu = Mutex.new 108 109 @client_call_executed = false 110 @client_call_executed_mu = Mutex.new 111 112 # set the peer now so that the accessor can still function 113 # after the server closes the call 114 @peer = call.peer 115 end 116 117 # Sends the initial metadata that has yet to be sent. 118 # Does nothing if metadata has already been sent for this call. 119 def send_initial_metadata(new_metadata = {}) 120 @send_initial_md_mutex.synchronize do 121 return if @metadata_sent 122 @metadata_to_send.merge!(new_metadata) 123 ActiveCall.client_invoke(@call, @metadata_to_send) 124 @metadata_sent = true 125 end 126 end 127 128 # output_metadata are provides access to hash that can be used to 129 # save metadata to be sent as trailer 130 def output_metadata 131 @output_metadata ||= {} 132 end 133 134 # cancelled indicates if the call was cancelled 135 def cancelled? 136 !@call.status.nil? && @call.status.code == Core::StatusCodes::CANCELLED 137 end 138 139 # multi_req_view provides a restricted view of this ActiveCall for use 140 # in a server client-streaming handler. 141 def multi_req_view 142 MultiReqView.new(self) 143 end 144 145 # single_req_view provides a restricted view of this ActiveCall for use in 146 # a server request-response handler. 147 def single_req_view 148 SingleReqView.new(self) 149 end 150 151 # operation provides a restricted view of this ActiveCall for use as 152 # a Operation. 153 def operation 154 @op_notifier = Notifier.new 155 Operation.new(self) 156 end 157 158 ## 159 # Returns a restricted view of this ActiveCall for use in interceptors 160 # 161 # @return [InterceptableView] 162 # 163 def interceptable 164 InterceptableView.new(self) 165 end 166 167 def receive_and_check_status 168 batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil) 169 set_input_stream_done 170 attach_status_results_and_complete_call(batch_result) 171 end 172 173 def attach_status_results_and_complete_call(recv_status_batch_result) 174 unless recv_status_batch_result.status.nil? 175 @call.trailing_metadata = recv_status_batch_result.status.metadata 176 end 177 @call.status = recv_status_batch_result.status 178 179 # The RECV_STATUS in run_batch always succeeds 180 # Check the status for a bad status or failed run batch 181 recv_status_batch_result.check_status 182 end 183 184 # remote_send sends a request to the remote endpoint. 185 # 186 # It blocks until the remote endpoint accepts the message. 187 # 188 # @param req [Object, String] the object to send or it's marshal form. 189 # @param marshalled [false, true] indicates if the object is already 190 # marshalled. 191 def remote_send(req, marshalled = false) 192 send_initial_metadata 193 GRPC.logger.debug("sending #{req}, marshalled? #{marshalled}") 194 payload = marshalled ? req : @marshal.call(req) 195 @call.run_batch(SEND_MESSAGE => payload) 196 end 197 198 # send_status sends a status to the remote endpoint. 199 # 200 # @param code [int] the status code to send 201 # @param details [String] details 202 # @param assert_finished [true, false] when true(default), waits for 203 # FINISHED. 204 # @param metadata [Hash] metadata to send to the server. If a value is a 205 # list, mulitple metadata for its key are sent 206 def send_status(code = OK, details = '', assert_finished = false, 207 metadata: {}) 208 send_initial_metadata 209 ops = { 210 SEND_STATUS_FROM_SERVER => Struct::Status.new(code, details, metadata) 211 } 212 ops[RECV_CLOSE_ON_SERVER] = nil if assert_finished 213 @call.run_batch(ops) 214 set_output_stream_done 215 216 nil 217 end 218 219 # Intended for use on server-side calls when a single request from 220 # the client is expected (i.e., unary and server-streaming RPC types). 221 def read_unary_request 222 req = remote_read 223 set_input_stream_done 224 req 225 end 226 227 def server_unary_response(req, trailing_metadata: {}, 228 code: Core::StatusCodes::OK, details: 'OK') 229 ops = {} 230 @send_initial_md_mutex.synchronize do 231 ops[SEND_INITIAL_METADATA] = @metadata_to_send unless @metadata_sent 232 @metadata_sent = true 233 end 234 235 payload = @marshal.call(req) 236 ops[SEND_MESSAGE] = payload 237 ops[SEND_STATUS_FROM_SERVER] = Struct::Status.new( 238 code, details, trailing_metadata) 239 ops[RECV_CLOSE_ON_SERVER] = nil 240 241 @call.run_batch(ops) 242 set_output_stream_done 243 end 244 245 # remote_read reads a response from the remote endpoint. 246 # 247 # It blocks until the remote endpoint replies with a message or status. 248 # On receiving a message, it returns the response after unmarshalling it. 249 # On receiving a status, it returns nil if the status is OK, otherwise 250 # raising BadStatus 251 def remote_read 252 ops = { RECV_MESSAGE => nil } 253 ops[RECV_INITIAL_METADATA] = nil unless @metadata_received 254 batch_result = @call.run_batch(ops) 255 unless @metadata_received 256 @call.metadata = batch_result.metadata 257 @metadata_received = true 258 end 259 get_message_from_batch_result(batch_result) 260 end 261 262 def get_message_from_batch_result(recv_message_batch_result) 263 unless recv_message_batch_result.nil? || 264 recv_message_batch_result.message.nil? 265 return @unmarshal.call(recv_message_batch_result.message) 266 end 267 GRPC.logger.debug('found nil; the final response has been sent') 268 nil 269 end 270 271 # each_remote_read passes each response to the given block or returns an 272 # enumerator the responses if no block is given. 273 # Used to generate the request enumerable for 274 # server-side client-streaming RPC's. 275 # 276 # == Enumerator == 277 # 278 # * #next blocks until the remote endpoint sends a READ or FINISHED 279 # * for each read, enumerator#next yields the response 280 # * on status 281 # * if it's is OK, enumerator#next raises StopException 282 # * if is not OK, enumerator#next raises RuntimeException 283 # 284 # == Block == 285 # 286 # * if provided it is executed for each response 287 # * the call blocks until no more responses are provided 288 # 289 # @return [Enumerator] if no block was given 290 def each_remote_read 291 return enum_for(:each_remote_read) unless block_given? 292 begin 293 loop do 294 resp = remote_read 295 break if resp.nil? # the last response was received 296 yield resp 297 end 298 ensure 299 set_input_stream_done 300 end 301 end 302 303 # each_remote_read_then_finish passes each response to the given block or 304 # returns an enumerator of the responses if no block is given. 305 # 306 # It is like each_remote_read, but it blocks on finishing on detecting 307 # the final message. 308 # 309 # == Enumerator == 310 # 311 # * #next blocks until the remote endpoint sends a READ or FINISHED 312 # * for each read, enumerator#next yields the response 313 # * on status 314 # * if it's is OK, enumerator#next raises StopException 315 # * if is not OK, enumerator#next raises RuntimeException 316 # 317 # == Block == 318 # 319 # * if provided it is executed for each response 320 # * the call blocks until no more responses are provided 321 # 322 # @return [Enumerator] if no block was given 323 def each_remote_read_then_finish 324 return enum_for(:each_remote_read_then_finish) unless block_given? 325 loop do 326 resp = 327 begin 328 remote_read 329 rescue GRPC::Core::CallError => e 330 GRPC.logger.warn("In each_remote_read_then_finish: #{e}") 331 nil 332 end 333 334 break if resp.nil? # the last response was received 335 yield resp 336 end 337 338 receive_and_check_status 339 ensure 340 set_input_stream_done 341 end 342 343 # request_response sends a request to a GRPC server, and returns the 344 # response. 345 # 346 # @param req [Object] the request sent to the server 347 # @param metadata [Hash] metadata to be sent to the server. If a value is 348 # a list, multiple metadata for its key are sent 349 # @return [Object] the response received from the server 350 def request_response(req, metadata: {}) 351 raise_error_if_already_executed 352 ops = { 353 SEND_MESSAGE => @marshal.call(req), 354 SEND_CLOSE_FROM_CLIENT => nil, 355 RECV_INITIAL_METADATA => nil, 356 RECV_MESSAGE => nil, 357 RECV_STATUS_ON_CLIENT => nil 358 } 359 @send_initial_md_mutex.synchronize do 360 # Metadata might have already been sent if this is an operation view 361 unless @metadata_sent 362 ops[SEND_INITIAL_METADATA] = @metadata_to_send.merge!(metadata) 363 end 364 @metadata_sent = true 365 end 366 367 begin 368 batch_result = @call.run_batch(ops) 369 # no need to check for cancellation after a CallError because this 370 # batch contains a RECV_STATUS op 371 ensure 372 set_input_stream_done 373 set_output_stream_done 374 end 375 376 @call.metadata = batch_result.metadata 377 attach_status_results_and_complete_call(batch_result) 378 get_message_from_batch_result(batch_result) 379 end 380 381 # client_streamer sends a stream of requests to a GRPC server, and 382 # returns a single response. 383 # 384 # requests provides an 'iterable' of Requests. I.e. it follows Ruby's 385 # #each enumeration protocol. In the simplest case, requests will be an 386 # array of marshallable objects; in typical case it will be an Enumerable 387 # that allows dynamic construction of the marshallable objects. 388 # 389 # @param requests [Object] an Enumerable of requests to send 390 # @param metadata [Hash] metadata to be sent to the server. If a value is 391 # a list, multiple metadata for its key are sent 392 # @return [Object] the response received from the server 393 def client_streamer(requests, metadata: {}) 394 raise_error_if_already_executed 395 begin 396 send_initial_metadata(metadata) 397 requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) } 398 rescue GRPC::Core::CallError => e 399 receive_and_check_status # check for Cancelled 400 raise e 401 rescue => e 402 set_input_stream_done 403 raise e 404 ensure 405 set_output_stream_done 406 end 407 408 batch_result = @call.run_batch( 409 SEND_CLOSE_FROM_CLIENT => nil, 410 RECV_INITIAL_METADATA => nil, 411 RECV_MESSAGE => nil, 412 RECV_STATUS_ON_CLIENT => nil 413 ) 414 415 set_input_stream_done 416 417 @call.metadata = batch_result.metadata 418 attach_status_results_and_complete_call(batch_result) 419 get_message_from_batch_result(batch_result) 420 end 421 422 # server_streamer sends one request to the GRPC server, which yields a 423 # stream of responses. 424 # 425 # responses provides an enumerator over the streamed responses, i.e. it 426 # follows Ruby's #each iteration protocol. The enumerator blocks while 427 # waiting for each response, stops when the server signals that no 428 # further responses will be supplied. If the implicit block is provided, 429 # it is executed with each response as the argument and no result is 430 # returned. 431 # 432 # @param req [Object] the request sent to the server 433 # @param metadata [Hash] metadata to be sent to the server. If a value is 434 # a list, multiple metadata for its key are sent 435 # @return [Enumerator|nil] a response Enumerator 436 def server_streamer(req, metadata: {}) 437 raise_error_if_already_executed 438 ops = { 439 SEND_MESSAGE => @marshal.call(req), 440 SEND_CLOSE_FROM_CLIENT => nil 441 } 442 @send_initial_md_mutex.synchronize do 443 # Metadata might have already been sent if this is an operation view 444 unless @metadata_sent 445 ops[SEND_INITIAL_METADATA] = @metadata_to_send.merge!(metadata) 446 end 447 @metadata_sent = true 448 end 449 450 begin 451 @call.run_batch(ops) 452 rescue GRPC::Core::CallError => e 453 receive_and_check_status # checks for Cancelled 454 raise e 455 rescue => e 456 set_input_stream_done 457 raise e 458 ensure 459 set_output_stream_done 460 end 461 462 replies = enum_for(:each_remote_read_then_finish) 463 return replies unless block_given? 464 replies.each { |r| yield r } 465 end 466 467 # bidi_streamer sends a stream of requests to the GRPC server, and yields 468 # a stream of responses. 469 # 470 # This method takes an Enumerable of requests, and returns and enumerable 471 # of responses. 472 # 473 # == requests == 474 # 475 # requests provides an 'iterable' of Requests. I.e. it follows Ruby's 476 # #each enumeration protocol. In the simplest case, requests will be an 477 # array of marshallable objects; in typical case it will be an 478 # Enumerable that allows dynamic construction of the marshallable 479 # objects. 480 # 481 # == responses == 482 # 483 # This is an enumerator of responses. I.e, its #next method blocks 484 # waiting for the next response. Also, if at any point the block needs 485 # to consume all the remaining responses, this can be done using #each or 486 # #collect. Calling #each or #collect should only be done if 487 # the_call#writes_done has been called, otherwise the block will loop 488 # forever. 489 # 490 # @param requests [Object] an Enumerable of requests to send 491 # @param metadata [Hash] metadata to be sent to the server. If a value is 492 # a list, multiple metadata for its key are sent 493 # @return [Enumerator, nil] a response Enumerator 494 def bidi_streamer(requests, metadata: {}, &blk) 495 raise_error_if_already_executed 496 # Metadata might have already been sent if this is an operation view 497 begin 498 send_initial_metadata(metadata) 499 rescue GRPC::Core::CallError => e 500 batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil) 501 set_input_stream_done 502 set_output_stream_done 503 attach_status_results_and_complete_call(batch_result) 504 raise e 505 rescue => e 506 set_input_stream_done 507 set_output_stream_done 508 raise e 509 end 510 511 bd = BidiCall.new(@call, 512 @marshal, 513 @unmarshal, 514 metadata_received: @metadata_received) 515 516 bd.run_on_client(requests, 517 proc { set_input_stream_done }, 518 proc { set_output_stream_done }, 519 &blk) 520 end 521 522 # run_server_bidi orchestrates a BiDi stream processing on a server. 523 # 524 # N.B. gen_each_reply is a func(Enumerable<Requests>) 525 # 526 # It takes an enumerable of requests as an arg, in case there is a 527 # relationship between the stream of requests and the stream of replies. 528 # 529 # This does not mean that must necessarily be one. E.g, the replies 530 # produced by gen_each_reply could ignore the received_msgs 531 # 532 # @param mth [Proc] generates the BiDi stream replies 533 # @param interception_ctx [InterceptionContext] 534 # 535 def run_server_bidi(mth, interception_ctx) 536 view = multi_req_view 537 bidi_call = BidiCall.new( 538 @call, 539 @marshal, 540 @unmarshal, 541 metadata_received: @metadata_received, 542 req_view: view 543 ) 544 requests = bidi_call.read_next_loop(proc { set_input_stream_done }, false) 545 interception_ctx.intercept!( 546 :bidi_streamer, 547 call: view, 548 method: mth, 549 requests: requests 550 ) do 551 bidi_call.run_on_server(mth, requests) 552 end 553 end 554 555 # Waits till an operation completes 556 def wait 557 return if @op_notifier.nil? 558 GRPC.logger.debug("active_call.wait: on #{@op_notifier}") 559 @op_notifier.wait 560 end 561 562 # Signals that an operation is done. 563 # Only relevant on the client-side (this is a no-op on the server-side) 564 def op_is_done 565 return if @op_notifier.nil? 566 @op_notifier.notify(self) 567 end 568 569 # Add to the metadata that will be sent from the server. 570 # Fails if metadata has already been sent. 571 # Unused by client calls. 572 def merge_metadata_to_send(new_metadata = {}) 573 @send_initial_md_mutex.synchronize do 574 fail('cant change metadata after already sent') if @metadata_sent 575 @metadata_to_send.merge!(new_metadata) 576 end 577 end 578 579 def attach_peer_cert(peer_cert) 580 @peer_cert = peer_cert 581 end 582 583 private 584 585 # To be called once the "input stream" has been completelly 586 # read through (i.e, done reading from client or received status) 587 # note this is idempotent 588 def set_input_stream_done 589 @call_finished_mu.synchronize do 590 @input_stream_done = true 591 maybe_finish_and_close_call_locked 592 end 593 end 594 595 # To be called once the "output stream" has been completelly 596 # sent through (i.e, done sending from client or sent status) 597 # note this is idempotent 598 def set_output_stream_done 599 @call_finished_mu.synchronize do 600 @output_stream_done = true 601 maybe_finish_and_close_call_locked 602 end 603 end 604 605 def maybe_finish_and_close_call_locked 606 return unless @output_stream_done && @input_stream_done 607 return if @call_finished 608 @call_finished = true 609 op_is_done 610 @call.close 611 end 612 613 # Starts the call if not already started 614 # @param metadata [Hash] metadata to be sent to the server. If a value is 615 # a list, multiple metadata for its key are sent 616 def start_call(metadata = {}) 617 merge_metadata_to_send(metadata) && send_initial_metadata 618 end 619 620 def raise_error_if_already_executed 621 @client_call_executed_mu.synchronize do 622 if @client_call_executed 623 fail GRPC::Core::CallError, 'attempting to re-run a call' 624 end 625 @client_call_executed = true 626 end 627 end 628 629 def self.view_class(*visible_methods) 630 Class.new do 631 extend ::Forwardable 632 def_delegators :@wrapped, *visible_methods 633 634 # @param wrapped [ActiveCall] the call whose methods are shielded 635 def initialize(wrapped) 636 @wrapped = wrapped 637 end 638 end 639 end 640 641 # SingleReqView limits access to an ActiveCall's methods for use in server 642 # handlers that receive just one request. 643 SingleReqView = view_class(:cancelled?, :deadline, :metadata, 644 :output_metadata, :peer, :peer_cert, 645 :send_initial_metadata, 646 :metadata_to_send, 647 :merge_metadata_to_send, 648 :metadata_sent) 649 650 # MultiReqView limits access to an ActiveCall's methods for use in 651 # server client_streamer handlers. 652 MultiReqView = view_class(:cancelled?, :deadline, 653 :each_remote_read, :metadata, :output_metadata, 654 :peer, :peer_cert, 655 :send_initial_metadata, 656 :metadata_to_send, 657 :merge_metadata_to_send, 658 :metadata_sent) 659 660 # Operation limits access to an ActiveCall's methods for use as 661 # a Operation on the client. 662 Operation = view_class(:cancel, :cancelled?, :deadline, :execute, 663 :metadata, :status, :start_call, :wait, :write_flag, 664 :write_flag=, :trailing_metadata) 665 666 # InterceptableView further limits access to an ActiveCall's methods 667 # for use in interceptors on the client, exposing only the deadline 668 InterceptableView = view_class(:deadline) 669 end 670end 671