1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15require 'forwardable' 16require_relative '../grpc' 17 18# GRPC contains the General RPC module. 19module GRPC 20 # The BiDiCall class orchestrates exection of a BiDi stream on a client or 21 # server. 22 class BidiCall 23 include Core::CallOps 24 include Core::StatusCodes 25 include Core::TimeConsts 26 27 # Creates a BidiCall. 28 # 29 # BidiCall should only be created after a call is accepted. That means 30 # different things on a client and a server. On the client, the call is 31 # accepted after call.invoke. On the server, this is after call.accept. 32 # 33 # #initialize cannot determine if the call is accepted or not; so if a 34 # call that's not accepted is used here, the error won't be visible until 35 # the BidiCall#run is called. 36 # 37 # deadline is the absolute deadline for the call. 38 # 39 # @param call [Call] the call used by the ActiveCall 40 # @param marshal [Function] f(obj)->string that marshal requests 41 # @param unmarshal [Function] f(string)->obj that unmarshals responses 42 # @param metadata_received [true|false] indicates if metadata has already 43 # been received. Should always be true for server calls 44 def initialize(call, marshal, unmarshal, metadata_received: false, 45 req_view: nil) 46 fail(ArgumentError, 'not a call') unless call.is_a? Core::Call 47 @call = call 48 @marshal = marshal 49 @op_notifier = nil # signals completion on clients 50 @unmarshal = unmarshal 51 @metadata_received = metadata_received 52 @reads_complete = false 53 @writes_complete = false 54 @complete = false 55 @done_mutex = Mutex.new 56 @req_view = req_view 57 end 58 59 # Begins orchestration of the Bidi stream for a client sending requests. 60 # 61 # The method either returns an Enumerator of the responses, or accepts a 62 # block that can be invoked with each response. 63 # 64 # @param requests the Enumerable of requests to send 65 # @param set_input_stream_done [Proc] called back when we're done 66 # reading the input stream 67 # @param set_output_stream_done [Proc] called back when we're done 68 # sending data on the output stream 69 # @return an Enumerator of requests to yield 70 def run_on_client(requests, 71 set_input_stream_done, 72 set_output_stream_done, 73 &blk) 74 @enq_th = Thread.new do 75 write_loop(requests, set_output_stream_done: set_output_stream_done) 76 end 77 read_loop(set_input_stream_done, &blk) 78 end 79 80 # Begins orchestration of the Bidi stream for a server generating replies. 81 # 82 # N.B. gen_each_reply is a func(Enumerable<Requests>) 83 # 84 # It takes an enumerable of requests as an arg, in case there is a 85 # relationship between the stream of requests and the stream of replies. 86 # 87 # This does not mean that must necessarily be one. E.g, the replies 88 # produced by gen_each_reply could ignore the received_msgs 89 # 90 # @param [Proc] gen_each_reply generates the BiDi stream replies. 91 # @param [Enumerable] requests The enumerable of requests to run 92 def run_on_server(gen_each_reply, requests) 93 replies = nil 94 95 # Pass in the optional call object parameter if possible 96 if gen_each_reply.arity == 1 97 replies = gen_each_reply.call(requests) 98 elsif gen_each_reply.arity == 2 99 replies = gen_each_reply.call(requests, @req_view) 100 else 101 fail 'Illegal arity of reply generator' 102 end 103 104 write_loop(replies, is_client: false) 105 end 106 107 ## 108 # Read the next stream iteration 109 # 110 # @param [Proc] finalize_stream callback to call when the reads have been 111 # completely read through. 112 # @param [Boolean] is_client If this is a client or server request 113 # 114 def read_next_loop(finalize_stream, is_client = false) 115 read_loop(finalize_stream, is_client: is_client) 116 end 117 118 private 119 120 END_OF_READS = :end_of_reads 121 END_OF_WRITES = :end_of_writes 122 123 # performs a read using @call.run_batch, ensures metadata is set up 124 def read_using_run_batch 125 ops = { RECV_MESSAGE => nil } 126 ops[RECV_INITIAL_METADATA] = nil unless @metadata_received 127 begin 128 batch_result = @call.run_batch(ops) 129 unless @metadata_received 130 @call.metadata = batch_result.metadata 131 @metadata_received = true 132 end 133 batch_result 134 rescue GRPC::Core::CallError => e 135 GRPC.logger.warn('bidi call: read_using_run_batch failed') 136 GRPC.logger.warn(e) 137 nil 138 end 139 end 140 141 # set_output_stream_done is relevant on client-side 142 def write_loop(requests, is_client: true, set_output_stream_done: nil) 143 GRPC.logger.debug('bidi-write-loop: starting') 144 count = 0 145 requests.each do |req| 146 GRPC.logger.debug("bidi-write-loop: #{count}") 147 count += 1 148 payload = @marshal.call(req) 149 # Fails if status already received 150 begin 151 @req_view.send_initial_metadata unless @req_view.nil? 152 @call.run_batch(SEND_MESSAGE => payload) 153 rescue GRPC::Core::CallError => e 154 # This is almost definitely caused by a status arriving while still 155 # writing. Don't re-throw the error 156 GRPC.logger.warn('bidi-write-loop: ended with error') 157 GRPC.logger.warn(e) 158 break 159 end 160 end 161 GRPC.logger.debug("bidi-write-loop: #{count} writes done") 162 if is_client 163 GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting") 164 begin 165 @call.run_batch(SEND_CLOSE_FROM_CLIENT => nil) 166 rescue GRPC::Core::CallError => e 167 GRPC.logger.warn('bidi-write-loop: send close failed') 168 GRPC.logger.warn(e) 169 end 170 GRPC.logger.debug('bidi-write-loop: done') 171 end 172 GRPC.logger.debug('bidi-write-loop: finished') 173 rescue StandardError => e 174 GRPC.logger.warn('bidi-write-loop: failed') 175 GRPC.logger.warn(e) 176 if is_client 177 @call.cancel_with_status(GRPC::Core::StatusCodes::UNKNOWN, 178 "GRPC bidi call error: #{e.inspect}") 179 else 180 raise e 181 end 182 ensure 183 set_output_stream_done.call if is_client 184 end 185 186 # Provides an enumerator that yields results of remote reads 187 def read_loop(set_input_stream_done, is_client: true) 188 return enum_for(:read_loop, 189 set_input_stream_done, 190 is_client: is_client) unless block_given? 191 GRPC.logger.debug('bidi-read-loop: starting') 192 begin 193 count = 0 194 # queue the initial read before beginning the loop 195 loop do 196 GRPC.logger.debug("bidi-read-loop: #{count}") 197 count += 1 198 batch_result = read_using_run_batch 199 200 # handle the next message 201 if batch_result.nil? || batch_result.message.nil? 202 GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}") 203 204 if is_client 205 batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil) 206 @call.status = batch_result.status 207 @call.trailing_metadata = @call.status.metadata if @call.status 208 GRPC.logger.debug("bidi-read-loop: done status #{@call.status}") 209 batch_result.check_status 210 end 211 212 GRPC.logger.debug('bidi-read-loop: done reading!') 213 break 214 end 215 216 res = @unmarshal.call(batch_result.message) 217 yield res 218 end 219 rescue StandardError => e 220 GRPC.logger.warn('bidi: read-loop failed') 221 GRPC.logger.warn(e) 222 raise e 223 ensure 224 set_input_stream_done.call 225 end 226 GRPC.logger.debug('bidi-read-loop: finished') 227 # Make sure that the write loop is done done before finishing the call. 228 # Note that blocking is ok at this point because we've already received 229 # a status 230 @enq_th.join if is_client 231 end 232 end 233end 234