• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15require 'forwardable'
16require 'weakref'
17require_relative 'bidi_call'
18
19class Struct
20  # BatchResult is the struct returned by calls to call#start_batch.
21  class BatchResult
22    # check_status returns the status, raising an error if the status
23    # is non-nil and not OK.
24    def check_status
25      return nil if status.nil?
26      if status.code != GRPC::Core::StatusCodes::OK
27        GRPC.logger.debug("Failing with status #{status}")
28        # raise BadStatus, propagating the metadata if present.
29        fail GRPC::BadStatus.new_status_exception(
30          status.code, status.details, status.metadata,
31          status.debug_error_string)
32      end
33      status
34    end
35  end
36end
37
38# GRPC contains the General RPC module.
39module GRPC
40  # The ActiveCall class provides simple methods for sending marshallable
41  # data to a call
42  class ActiveCall # rubocop:disable Metrics/ClassLength
43    include Core::TimeConsts
44    include Core::CallOps
45    extend Forwardable
46    attr_reader :deadline, :metadata_sent, :metadata_to_send, :peer, :peer_cert
47    def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=,
48                   :trailing_metadata, :status
49
50    # client_invoke begins a client invocation.
51    #
52    # Flow Control note: this blocks until flow control accepts that client
53    # request can go ahead.
54    #
55    # deadline is the absolute deadline for the call.
56    #
57    # == Keyword Arguments ==
58    # any keyword arguments are treated as metadata to be sent to the server
59    # if a keyword value is a list, multiple metadata for it's key are sent
60    #
61    # @param call [Call] a call on which to start and invocation
62    # @param metadata [Hash] the metadata
63    def self.client_invoke(call, metadata = {})
64      fail(TypeError, '!Core::Call') unless call.is_a? Core::Call
65      call.run_batch(SEND_INITIAL_METADATA => metadata)
66    end
67
68    # Creates an ActiveCall.
69    #
70    # ActiveCall should only be created after a call is accepted.  That
71    # means different things on a client and a server.  On the client, the
72    # call is accepted after calling call.invoke.  On the server, this is
73    # after call.accept.
74    #
75    # #initialize cannot determine if the call is accepted or not; so if a
76    # call that's not accepted is used here, the error won't be visible until
77    # the ActiveCall methods are called.
78    #
79    # deadline is the absolute deadline for the call.
80    #
81    # @param call [Call] the call used by the ActiveCall
82    # @param marshal [Function] f(obj)->string that marshal requests
83    # @param unmarshal [Function] f(string)->obj that unmarshals responses
84    # @param deadline [Fixnum] the deadline for the call to complete
85    # @param started [true|false] indicates that metadata was sent
86    # @param metadata_received [true|false] indicates if metadata has already
87    #     been received. Should always be true for server calls
88    def initialize(call, marshal, unmarshal, deadline, started: true,
89                   metadata_received: false, metadata_to_send: nil)
90      fail(TypeError, '!Core::Call') unless call.is_a? Core::Call
91      @call = call
92      @deadline = deadline
93      @marshal = marshal
94      @unmarshal = unmarshal
95      @metadata_received = metadata_received
96      @metadata_sent = started
97      @op_notifier = nil
98
99      fail(ArgumentError, 'Already sent md') if started && metadata_to_send
100      @metadata_to_send = metadata_to_send || {} unless started
101      @send_initial_md_mutex = Mutex.new
102
103      @output_stream_done = false
104      @input_stream_done = false
105      @call_finished = false
106      @call_finished_mu = Mutex.new
107
108      @client_call_executed = false
109      @client_call_executed_mu = Mutex.new
110
111      # set the peer now so that the accessor can still function
112      # after the server closes the call
113      @peer = call.peer
114    end
115
116    # Sends the initial metadata that has yet to be sent.
117    # Does nothing if metadata has already been sent for this call.
118    def send_initial_metadata(new_metadata = {})
119      @send_initial_md_mutex.synchronize do
120        return if @metadata_sent
121        @metadata_to_send.merge!(new_metadata)
122        ActiveCall.client_invoke(@call, @metadata_to_send)
123        @metadata_sent = true
124      end
125    end
126
127    # output_metadata are provides access to hash that can be used to
128    # save metadata to be sent as trailer
129    def output_metadata
130      @output_metadata ||= {}
131    end
132
133    # cancelled indicates if the call was cancelled
134    def cancelled?
135      !@call.status.nil? && @call.status.code == Core::StatusCodes::CANCELLED
136    end
137
138    # multi_req_view provides a restricted view of this ActiveCall for use
139    # in a server client-streaming handler.
140    def multi_req_view
141      MultiReqView.new(self)
142    end
143
144    # single_req_view provides a restricted view of this ActiveCall for use in
145    # a server request-response handler.
146    def single_req_view
147      SingleReqView.new(self)
148    end
149
150    # operation provides a restricted view of this ActiveCall for use as
151    # a Operation.
152    def operation
153      @op_notifier = Notifier.new
154      Operation.new(self)
155    end
156
157    ##
158    # Returns a restricted view of this ActiveCall for use in interceptors
159    #
160    # @return [InterceptableView]
161    #
162    def interceptable
163      InterceptableView.new(self)
164    end
165
166    def receive_and_check_status
167      batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
168      set_input_stream_done
169      attach_status_results_and_complete_call(batch_result)
170    end
171
172    def attach_status_results_and_complete_call(recv_status_batch_result)
173      unless recv_status_batch_result.status.nil?
174        @call.trailing_metadata = recv_status_batch_result.status.metadata
175      end
176      @call.status = recv_status_batch_result.status
177
178      # The RECV_STATUS in run_batch always succeeds
179      # Check the status for a bad status or failed run batch
180      recv_status_batch_result.check_status
181    end
182
183    # remote_send sends a request to the remote endpoint.
184    #
185    # It blocks until the remote endpoint accepts the message.
186    #
187    # @param req [Object, String] the object to send or it's marshal form.
188    # @param marshalled [false, true] indicates if the object is already
189    # marshalled.
190    def remote_send(req, marshalled = false)
191      send_initial_metadata
192      GRPC.logger.debug("sending #{req}, marshalled? #{marshalled}")
193      payload = marshalled ? req : @marshal.call(req)
194      @call.run_batch(SEND_MESSAGE => payload)
195    end
196
197    # send_status sends a status to the remote endpoint.
198    #
199    # @param code [int] the status code to send
200    # @param details [String] details
201    # @param assert_finished [true, false] when true(default), waits for
202    # FINISHED.
203    # @param metadata [Hash] metadata to send to the server. If a value is a
204    # list, mulitple metadata for its key are sent
205    def send_status(code = OK, details = '', assert_finished = false,
206                    metadata: {})
207      send_initial_metadata
208      ops = {
209        SEND_STATUS_FROM_SERVER => Struct::Status.new(code, details, metadata)
210      }
211      ops[RECV_CLOSE_ON_SERVER] = nil if assert_finished
212      @call.run_batch(ops)
213      set_output_stream_done
214
215      nil
216    end
217
218    # Intended for use on server-side calls when a single request from
219    # the client is expected (i.e., unary and server-streaming RPC types).
220    def read_unary_request
221      req = remote_read
222      set_input_stream_done
223      req
224    end
225
226    def server_unary_response(req, trailing_metadata: {},
227                              code: Core::StatusCodes::OK, details: 'OK')
228      ops = {}
229      @send_initial_md_mutex.synchronize do
230        ops[SEND_INITIAL_METADATA] = @metadata_to_send unless @metadata_sent
231        @metadata_sent = true
232      end
233
234      payload = @marshal.call(req)
235      ops[SEND_MESSAGE] = payload
236      ops[SEND_STATUS_FROM_SERVER] = Struct::Status.new(
237        code, details, trailing_metadata)
238      ops[RECV_CLOSE_ON_SERVER] = nil
239
240      @call.run_batch(ops)
241      set_output_stream_done
242    end
243
244    # remote_read reads a response from the remote endpoint.
245    #
246    # It blocks until the remote endpoint replies with a message or status.
247    # On receiving a message, it returns the response after unmarshalling it.
248    # On receiving a status, it returns nil if the status is OK, otherwise
249    # raising BadStatus
250    def remote_read
251      ops = { RECV_MESSAGE => nil }
252      ops[RECV_INITIAL_METADATA] = nil unless @metadata_received
253      batch_result = @call.run_batch(ops)
254      unless @metadata_received
255        @call.metadata = batch_result.metadata
256        @metadata_received = true
257      end
258      get_message_from_batch_result(batch_result)
259    end
260
261    def get_message_from_batch_result(recv_message_batch_result)
262      unless recv_message_batch_result.nil? ||
263             recv_message_batch_result.message.nil?
264        return @unmarshal.call(recv_message_batch_result.message)
265      end
266      GRPC.logger.debug('found nil; the final response has been sent')
267      nil
268    end
269
270    # each_remote_read passes each response to the given block or returns an
271    # enumerator the responses if no block is given.
272    # Used to generate the request enumerable for
273    # server-side client-streaming RPC's.
274    #
275    # == Enumerator ==
276    #
277    # * #next blocks until the remote endpoint sends a READ or FINISHED
278    # * for each read, enumerator#next yields the response
279    # * on status
280    #    * if it's is OK, enumerator#next raises StopException
281    #    * if is not OK, enumerator#next raises RuntimeException
282    #
283    # == Block ==
284    #
285    # * if provided it is executed for each response
286    # * the call blocks until no more responses are provided
287    #
288    # @return [Enumerator] if no block was given
289    def each_remote_read
290      return enum_for(:each_remote_read) unless block_given?
291      begin
292        loop do
293          resp = remote_read
294          break if resp.nil?  # the last response was received
295          yield resp
296        end
297      ensure
298        set_input_stream_done
299      end
300    end
301
302    # each_remote_read_then_finish passes each response to the given block or
303    # returns an enumerator of the responses if no block is given.
304    #
305    # It is like each_remote_read, but it blocks on finishing on detecting
306    # the final message.
307    #
308    # == Enumerator ==
309    #
310    # * #next blocks until the remote endpoint sends a READ or FINISHED
311    # * for each read, enumerator#next yields the response
312    # * on status
313    #    * if it's is OK, enumerator#next raises StopException
314    #    * if is not OK, enumerator#next raises RuntimeException
315    #
316    # == Block ==
317    #
318    # * if provided it is executed for each response
319    # * the call blocks until no more responses are provided
320    #
321    # @return [Enumerator] if no block was given
322    def each_remote_read_then_finish
323      return enum_for(:each_remote_read_then_finish) unless block_given?
324      loop do
325        resp =
326          begin
327            remote_read
328          rescue GRPC::Core::CallError => e
329            GRPC.logger.warn("In each_remote_read_then_finish: #{e}")
330            nil
331          end
332
333        break if resp.nil?  # the last response was received
334        yield resp
335      end
336
337      receive_and_check_status
338    ensure
339      set_input_stream_done
340    end
341
342    # request_response sends a request to a GRPC server, and returns the
343    # response.
344    #
345    # @param req [Object] the request sent to the server
346    # @param metadata [Hash] metadata to be sent to the server. If a value is
347    # a list, multiple metadata for its key are sent
348    # @return [Object] the response received from the server
349    def request_response(req, metadata: {})
350      raise_error_if_already_executed
351      ops = {
352        SEND_MESSAGE => @marshal.call(req),
353        SEND_CLOSE_FROM_CLIENT => nil,
354        RECV_INITIAL_METADATA => nil,
355        RECV_MESSAGE => nil,
356        RECV_STATUS_ON_CLIENT => nil
357      }
358      @send_initial_md_mutex.synchronize do
359        # Metadata might have already been sent if this is an operation view
360        unless @metadata_sent
361          ops[SEND_INITIAL_METADATA] = @metadata_to_send.merge!(metadata)
362        end
363        @metadata_sent = true
364      end
365
366      begin
367        batch_result = @call.run_batch(ops)
368        # no need to check for cancellation after a CallError because this
369        # batch contains a RECV_STATUS op
370      ensure
371        set_input_stream_done
372        set_output_stream_done
373      end
374
375      @call.metadata = batch_result.metadata
376      attach_status_results_and_complete_call(batch_result)
377      get_message_from_batch_result(batch_result)
378    end
379
380    # client_streamer sends a stream of requests to a GRPC server, and
381    # returns a single response.
382    #
383    # requests provides an 'iterable' of Requests. I.e. it follows Ruby's
384    # #each enumeration protocol. In the simplest case, requests will be an
385    # array of marshallable objects; in typical case it will be an Enumerable
386    # that allows dynamic construction of the marshallable objects.
387    #
388    # @param requests [Object] an Enumerable of requests to send
389    # @param metadata [Hash] metadata to be sent to the server. If a value is
390    # a list, multiple metadata for its key are sent
391    # @return [Object] the response received from the server
392    def client_streamer(requests, metadata: {})
393      raise_error_if_already_executed
394      begin
395        send_initial_metadata(metadata)
396        requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) }
397      rescue GRPC::Core::CallError => e
398        receive_and_check_status # check for Cancelled
399        raise e
400      rescue => e
401        set_input_stream_done
402        raise e
403      ensure
404        set_output_stream_done
405      end
406
407      batch_result = @call.run_batch(
408        SEND_CLOSE_FROM_CLIENT => nil,
409        RECV_INITIAL_METADATA => nil,
410        RECV_MESSAGE => nil,
411        RECV_STATUS_ON_CLIENT => nil
412      )
413
414      set_input_stream_done
415
416      @call.metadata = batch_result.metadata
417      attach_status_results_and_complete_call(batch_result)
418      get_message_from_batch_result(batch_result)
419    end
420
421    # server_streamer sends one request to the GRPC server, which yields a
422    # stream of responses.
423    #
424    # responses provides an enumerator over the streamed responses, i.e. it
425    # follows Ruby's #each iteration protocol.  The enumerator blocks while
426    # waiting for each response, stops when the server signals that no
427    # further responses will be supplied.  If the implicit block is provided,
428    # it is executed with each response as the argument and no result is
429    # returned.
430    #
431    # @param req [Object] the request sent to the server
432    # @param metadata [Hash] metadata to be sent to the server. If a value is
433    # a list, multiple metadata for its key are sent
434    # @return [Enumerator|nil] a response Enumerator
435    def server_streamer(req, metadata: {})
436      raise_error_if_already_executed
437      ops = {
438        SEND_MESSAGE => @marshal.call(req),
439        SEND_CLOSE_FROM_CLIENT => nil
440      }
441      @send_initial_md_mutex.synchronize do
442        # Metadata might have already been sent if this is an operation view
443        unless @metadata_sent
444          ops[SEND_INITIAL_METADATA] = @metadata_to_send.merge!(metadata)
445        end
446        @metadata_sent = true
447      end
448
449      begin
450        @call.run_batch(ops)
451      rescue GRPC::Core::CallError => e
452        receive_and_check_status # checks for Cancelled
453        raise e
454      rescue => e
455        set_input_stream_done
456        raise e
457      ensure
458        set_output_stream_done
459      end
460
461      replies = enum_for(:each_remote_read_then_finish)
462      return replies unless block_given?
463      replies.each { |r| yield r }
464    end
465
466    # bidi_streamer sends a stream of requests to the GRPC server, and yields
467    # a stream of responses.
468    #
469    # This method takes an Enumerable of requests, and returns and enumerable
470    # of responses.
471    #
472    # == requests ==
473    #
474    # requests provides an 'iterable' of Requests. I.e. it follows Ruby's
475    # #each enumeration protocol. In the simplest case, requests will be an
476    # array of marshallable objects; in typical case it will be an
477    # Enumerable that allows dynamic construction of the marshallable
478    # objects.
479    #
480    # == responses ==
481    #
482    # This is an enumerator of responses.  I.e, its #next method blocks
483    # waiting for the next response.  Also, if at any point the block needs
484    # to consume all the remaining responses, this can be done using #each or
485    # #collect.  Calling #each or #collect should only be done if
486    # the_call#writes_done has been called, otherwise the block will loop
487    # forever.
488    #
489    # @param requests [Object] an Enumerable of requests to send
490    # @param metadata [Hash] metadata to be sent to the server. If a value is
491    # a list, multiple metadata for its key are sent
492    # @return [Enumerator, nil] a response Enumerator
493    def bidi_streamer(requests, metadata: {}, &blk)
494      raise_error_if_already_executed
495      # Metadata might have already been sent if this is an operation view
496      begin
497        send_initial_metadata(metadata)
498      rescue GRPC::Core::CallError => e
499        batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
500        set_input_stream_done
501        set_output_stream_done
502        attach_status_results_and_complete_call(batch_result)
503        raise e
504      rescue => e
505        set_input_stream_done
506        set_output_stream_done
507        raise e
508      end
509
510      bd = BidiCall.new(@call,
511                        @marshal,
512                        @unmarshal,
513                        metadata_received: @metadata_received)
514
515      bd.run_on_client(requests,
516                       proc { set_input_stream_done },
517                       proc { set_output_stream_done },
518                       &blk)
519    end
520
521    # run_server_bidi orchestrates a BiDi stream processing on a server.
522    #
523    # N.B. gen_each_reply is a func(Enumerable<Requests>)
524    #
525    # It takes an enumerable of requests as an arg, in case there is a
526    # relationship between the stream of requests and the stream of replies.
527    #
528    # This does not mean that must necessarily be one.  E.g, the replies
529    # produced by gen_each_reply could ignore the received_msgs
530    #
531    # @param mth [Proc] generates the BiDi stream replies
532    # @param interception_ctx [InterceptionContext]
533    #
534    def run_server_bidi(mth, interception_ctx)
535      view = multi_req_view
536      bidi_call = BidiCall.new(
537        @call,
538        @marshal,
539        @unmarshal,
540        metadata_received: @metadata_received,
541        req_view: view
542      )
543      requests = bidi_call.read_next_loop(proc { set_input_stream_done }, false)
544      interception_ctx.intercept!(
545        :bidi_streamer,
546        call: view,
547        method: mth,
548        requests: requests
549      ) do
550        bidi_call.run_on_server(mth, requests)
551      end
552    end
553
554    # Waits till an operation completes
555    def wait
556      return if @op_notifier.nil?
557      GRPC.logger.debug("active_call.wait: on #{@op_notifier}")
558      @op_notifier.wait
559    end
560
561    # Signals that an operation is done.
562    # Only relevant on the client-side (this is a no-op on the server-side)
563    def op_is_done
564      return if @op_notifier.nil?
565      @op_notifier.notify(self)
566    end
567
568    # Add to the metadata that will be sent from the server.
569    # Fails if metadata has already been sent.
570    # Unused by client calls.
571    def merge_metadata_to_send(new_metadata = {})
572      @send_initial_md_mutex.synchronize do
573        fail('cant change metadata after already sent') if @metadata_sent
574        @metadata_to_send.merge!(new_metadata)
575      end
576    end
577
578    def attach_peer_cert(peer_cert)
579      @peer_cert = peer_cert
580    end
581
582    private
583
584    # To be called once the "input stream" has been completelly
585    # read through (i.e, done reading from client or received status)
586    # note this is idempotent
587    def set_input_stream_done
588      @call_finished_mu.synchronize do
589        @input_stream_done = true
590        maybe_finish_and_close_call_locked
591      end
592    end
593
594    # To be called once the "output stream" has been completelly
595    # sent through (i.e, done sending from client or sent status)
596    # note this is idempotent
597    def set_output_stream_done
598      @call_finished_mu.synchronize do
599        @output_stream_done = true
600        maybe_finish_and_close_call_locked
601      end
602    end
603
604    def maybe_finish_and_close_call_locked
605      return unless @output_stream_done && @input_stream_done
606      return if @call_finished
607      @call_finished = true
608      op_is_done
609      @call.close
610    end
611
612    # Starts the call if not already started
613    # @param metadata [Hash] metadata to be sent to the server. If a value is
614    # a list, multiple metadata for its key are sent
615    def start_call(metadata = {})
616      merge_metadata_to_send(metadata) && send_initial_metadata
617    end
618
619    def raise_error_if_already_executed
620      @client_call_executed_mu.synchronize do
621        if @client_call_executed
622          fail GRPC::Core::CallError, 'attempting to re-run a call'
623        end
624        @client_call_executed = true
625      end
626    end
627
628    def self.view_class(*visible_methods)
629      Class.new do
630        extend ::Forwardable
631        def_delegators :@wrapped, *visible_methods
632
633        # @param wrapped [ActiveCall] the call whose methods are shielded
634        def initialize(wrapped)
635          @wrapped = wrapped
636        end
637      end
638    end
639
640    # SingleReqView limits access to an ActiveCall's methods for use in server
641    # handlers that receive just one request.
642    SingleReqView = view_class(:cancelled?, :deadline, :metadata,
643                               :output_metadata, :peer, :peer_cert,
644                               :send_initial_metadata,
645                               :metadata_to_send,
646                               :merge_metadata_to_send,
647                               :metadata_sent)
648
649    # MultiReqView limits access to an ActiveCall's methods for use in
650    # server client_streamer handlers.
651    MultiReqView = view_class(:cancelled?, :deadline,
652                              :each_remote_read, :metadata, :output_metadata,
653                              :peer, :peer_cert,
654                              :send_initial_metadata,
655                              :metadata_to_send,
656                              :merge_metadata_to_send,
657                              :metadata_sent)
658
659    # Operation limits access to an ActiveCall's methods for use as
660    # a Operation on the client.
661    Operation = view_class(:cancel, :cancelled?, :deadline, :execute,
662                           :metadata, :status, :start_call, :wait, :write_flag,
663                           :write_flag=, :trailing_metadata)
664
665    # InterceptableView further limits access to an ActiveCall's methods
666    # for use in interceptors on the client, exposing only the deadline
667    InterceptableView = view_class(:deadline)
668  end
669end
670