1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15require_relative 'active_call' 16require_relative '../version' 17 18# GRPC contains the General RPC module. 19module GRPC 20 # rubocop:disable Metrics/ParameterLists 21 22 # ClientStub represents a client connection to a gRPC server, and can be used 23 # to send requests. 24 class ClientStub 25 include Core::StatusCodes 26 include Core::TimeConsts 27 28 # Default timeout is infinity. 29 DEFAULT_TIMEOUT = INFINITE_FUTURE 30 31 # setup_channel is used by #initialize to constuct a channel from its 32 # arguments. 33 def self.setup_channel(alt_chan, host, creds, channel_args = {}) 34 unless alt_chan.nil? 35 fail(TypeError, '!Channel') unless alt_chan.is_a?(Core::Channel) 36 return alt_chan 37 end 38 if channel_args['grpc.primary_user_agent'].nil? 39 channel_args['grpc.primary_user_agent'] = '' 40 else 41 channel_args['grpc.primary_user_agent'] += ' ' 42 end 43 channel_args['grpc.primary_user_agent'] += "grpc-ruby/#{VERSION}" 44 unless creds.is_a?(Core::ChannelCredentials) || creds.is_a?(Symbol) 45 fail(TypeError, '!ChannelCredentials or Symbol') 46 end 47 Core::Channel.new(host, channel_args, creds) 48 end 49 50 # Allows users of the stub to modify the propagate mask. 51 # 52 # This is an advanced feature for use when making calls to another gRPC 53 # server whilst running in the handler of an existing one. 54 attr_writer :propagate_mask 55 56 # Creates a new ClientStub. 57 # 58 # Minimally, a stub is created with the just the host of the gRPC service 59 # it wishes to access, e.g., 60 # 61 # my_stub = ClientStub.new(example.host.com:50505, 62 # :this_channel_is_insecure) 63 # 64 # If a channel_override argument is passed, it will be used as the 65 # underlying channel. Otherwise, the channel_args argument will be used 66 # to construct a new underlying channel. 67 # 68 # There are some specific keyword args that are not used to configure the 69 # channel: 70 # 71 # - :channel_override 72 # when present, this must be a pre-created GRPC::Core::Channel. If it's 73 # present the host and arbitrary keyword arg areignored, and the RPC 74 # connection uses this channel. 75 # 76 # - :timeout 77 # when present, this is the default timeout used for calls 78 # 79 # @param host [String] the host the stub connects to 80 # @param creds [Core::ChannelCredentials|Symbol] the channel credentials, or 81 # :this_channel_is_insecure, which explicitly indicates that the client 82 # should be created with an insecure connection. Note: this argument is 83 # ignored if the channel_override argument is provided. 84 # @param channel_override [Core::Channel] a pre-created channel 85 # @param timeout [Number] the default timeout to use in requests 86 # @param propagate_mask [Number] A bitwise combination of flags in 87 # GRPC::Core::PropagateMasks. Indicates how data should be propagated 88 # from parent server calls to child client calls if this client is being 89 # used within a gRPC server. 90 # @param channel_args [Hash] the channel arguments. Note: this argument is 91 # ignored if the channel_override argument is provided. 92 # @param interceptors [Array<GRPC::ClientInterceptor>] An array of 93 # GRPC::ClientInterceptor objects that will be used for 94 # intercepting calls before they are executed 95 # Interceptors are an EXPERIMENTAL API. 96 def initialize(host, creds, 97 channel_override: nil, 98 timeout: nil, 99 propagate_mask: nil, 100 channel_args: {}, 101 interceptors: []) 102 @ch = ClientStub.setup_channel(channel_override, host, creds, 103 channel_args) 104 alt_host = channel_args[Core::Channel::SSL_TARGET] 105 @host = alt_host.nil? ? host : alt_host 106 @propagate_mask = propagate_mask 107 @timeout = timeout.nil? ? DEFAULT_TIMEOUT : timeout 108 @interceptors = InterceptorRegistry.new(interceptors) 109 end 110 111 # request_response sends a request to a GRPC server, and returns the 112 # response. 113 # 114 # == Flow Control == 115 # This is a blocking call. 116 # 117 # * it does not return until a response is received. 118 # 119 # * the requests is sent only when GRPC core's flow control allows it to 120 # be sent. 121 # 122 # == Errors == 123 # An RuntimeError is raised if 124 # 125 # * the server responds with a non-OK status 126 # 127 # * the deadline is exceeded 128 # 129 # == Return Value == 130 # 131 # If return_op is false, the call returns the response 132 # 133 # If return_op is true, the call returns an Operation, calling execute 134 # on the Operation returns the response. 135 # 136 # @param method [String] the RPC method to call on the GRPC server 137 # @param req [Object] the request sent to the server 138 # @param marshal [Function] f(obj)->string that marshals requests 139 # @param unmarshal [Function] f(string)->obj that unmarshals responses 140 # @param deadline [Time] (optional) the time the request should complete 141 # @param return_op [true|false] return an Operation if true 142 # @param parent [Core::Call] a prior call whose reserved metadata 143 # will be propagated by this one. 144 # @param credentials [Core::CallCredentials] credentials to use when making 145 # the call 146 # @param metadata [Hash] metadata to be sent to the server 147 # @return [Object] the response received from the server 148 def request_response(method, req, marshal, unmarshal, 149 deadline: nil, 150 return_op: false, 151 parent: nil, 152 credentials: nil, 153 metadata: {}) 154 c = new_active_call(method, marshal, unmarshal, 155 deadline: deadline, 156 parent: parent, 157 credentials: credentials) 158 interception_context = @interceptors.build_context 159 intercept_args = { 160 method: method, 161 request: req, 162 call: c.interceptable, 163 metadata: metadata 164 } 165 if return_op 166 # return the operation view of the active_call; define #execute as a 167 # new method for this instance that invokes #request_response. 168 c.merge_metadata_to_send(metadata) 169 op = c.operation 170 op.define_singleton_method(:execute) do 171 interception_context.intercept!(:request_response, intercept_args) do 172 c.request_response(req, metadata: metadata) 173 end 174 end 175 op 176 else 177 interception_context.intercept!(:request_response, intercept_args) do 178 c.request_response(req, metadata: metadata) 179 end 180 end 181 end 182 183 # client_streamer sends a stream of requests to a GRPC server, and 184 # returns a single response. 185 # 186 # requests provides an 'iterable' of Requests. I.e. it follows Ruby's 187 # #each enumeration protocol. In the simplest case, requests will be an 188 # array of marshallable objects; in typical case it will be an Enumerable 189 # that allows dynamic construction of the marshallable objects. 190 # 191 # == Flow Control == 192 # This is a blocking call. 193 # 194 # * it does not return until a response is received. 195 # 196 # * each requests is sent only when GRPC core's flow control allows it to 197 # be sent. 198 # 199 # == Errors == 200 # An RuntimeError is raised if 201 # 202 # * the server responds with a non-OK status 203 # 204 # * the deadline is exceeded 205 # 206 # == Return Value == 207 # 208 # If return_op is false, the call consumes the requests and returns 209 # the response. 210 # 211 # If return_op is true, the call returns the response. 212 # 213 # @param method [String] the RPC method to call on the GRPC server 214 # @param requests [Object] an Enumerable of requests to send 215 # @param marshal [Function] f(obj)->string that marshals requests 216 # @param unmarshal [Function] f(string)->obj that unmarshals responses 217 # @param deadline [Time] (optional) the time the request should complete 218 # @param return_op [true|false] return an Operation if true 219 # @param parent [Core::Call] a prior call whose reserved metadata 220 # will be propagated by this one. 221 # @param credentials [Core::CallCredentials] credentials to use when making 222 # the call 223 # @param metadata [Hash] metadata to be sent to the server 224 # @return [Object|Operation] the response received from the server 225 def client_streamer(method, requests, marshal, unmarshal, 226 deadline: nil, 227 return_op: false, 228 parent: nil, 229 credentials: nil, 230 metadata: {}) 231 c = new_active_call(method, marshal, unmarshal, 232 deadline: deadline, 233 parent: parent, 234 credentials: credentials) 235 interception_context = @interceptors.build_context 236 intercept_args = { 237 method: method, 238 requests: requests, 239 call: c.interceptable, 240 metadata: metadata 241 } 242 if return_op 243 # return the operation view of the active_call; define #execute as a 244 # new method for this instance that invokes #client_streamer. 245 c.merge_metadata_to_send(metadata) 246 op = c.operation 247 op.define_singleton_method(:execute) do 248 interception_context.intercept!(:client_streamer, intercept_args) do 249 c.client_streamer(requests) 250 end 251 end 252 op 253 else 254 interception_context.intercept!(:client_streamer, intercept_args) do 255 c.client_streamer(requests, metadata: metadata) 256 end 257 end 258 end 259 260 # server_streamer sends one request to the GRPC server, which yields a 261 # stream of responses. 262 # 263 # responses provides an enumerator over the streamed responses, i.e. it 264 # follows Ruby's #each iteration protocol. The enumerator blocks while 265 # waiting for each response, stops when the server signals that no 266 # further responses will be supplied. If the implicit block is provided, 267 # it is executed with each response as the argument and no result is 268 # returned. 269 # 270 # == Flow Control == 271 # This is a blocking call. 272 # 273 # * the request is sent only when GRPC core's flow control allows it to 274 # be sent. 275 # 276 # * the request will not complete until the server sends the final 277 # response followed by a status message. 278 # 279 # == Errors == 280 # An RuntimeError is raised if 281 # 282 # * the server responds with a non-OK status when any response is 283 # * retrieved 284 # 285 # * the deadline is exceeded 286 # 287 # == Return Value == 288 # 289 # if the return_op is false, the return value is an Enumerator of the 290 # results, unless a block is provided, in which case the block is 291 # executed with each response. 292 # 293 # if return_op is true, the function returns an Operation whose #execute 294 # method runs server streamer call. Again, Operation#execute either 295 # calls the given block with each response or returns an Enumerator of the 296 # responses. 297 # 298 # == Keyword Args == 299 # 300 # Unspecified keyword arguments are treated as metadata to be sent to the 301 # server. 302 # 303 # @param method [String] the RPC method to call on the GRPC server 304 # @param req [Object] the request sent to the server 305 # @param marshal [Function] f(obj)->string that marshals requests 306 # @param unmarshal [Function] f(string)->obj that unmarshals responses 307 # @param deadline [Time] (optional) the time the request should complete 308 # @param return_op [true|false]return an Operation if true 309 # @param parent [Core::Call] a prior call whose reserved metadata 310 # will be propagated by this one. 311 # @param credentials [Core::CallCredentials] credentials to use when making 312 # the call 313 # @param metadata [Hash] metadata to be sent to the server 314 # @param blk [Block] when provided, is executed for each response 315 # @return [Enumerator|Operation|nil] as discussed above 316 def server_streamer(method, req, marshal, unmarshal, 317 deadline: nil, 318 return_op: false, 319 parent: nil, 320 credentials: nil, 321 metadata: {}, 322 &blk) 323 c = new_active_call(method, marshal, unmarshal, 324 deadline: deadline, 325 parent: parent, 326 credentials: credentials) 327 interception_context = @interceptors.build_context 328 intercept_args = { 329 method: method, 330 request: req, 331 call: c.interceptable, 332 metadata: metadata 333 } 334 if return_op 335 # return the operation view of the active_call; define #execute 336 # as a new method for this instance that invokes #server_streamer 337 c.merge_metadata_to_send(metadata) 338 op = c.operation 339 op.define_singleton_method(:execute) do 340 interception_context.intercept!(:server_streamer, intercept_args) do 341 c.server_streamer(req, &blk) 342 end 343 end 344 op 345 else 346 interception_context.intercept!(:server_streamer, intercept_args) do 347 c.server_streamer(req, metadata: metadata, &blk) 348 end 349 end 350 end 351 352 # bidi_streamer sends a stream of requests to the GRPC server, and yields 353 # a stream of responses. 354 # 355 # This method takes an Enumerable of requests, and returns and enumerable 356 # of responses. 357 # 358 # == requests == 359 # 360 # requests provides an 'iterable' of Requests. I.e. it follows Ruby's 361 # #each enumeration protocol. In the simplest case, requests will be an 362 # array of marshallable objects; in typical case it will be an 363 # Enumerable that allows dynamic construction of the marshallable 364 # objects. 365 # 366 # == responses == 367 # 368 # This is an enumerator of responses. I.e, its #next method blocks 369 # waiting for the next response. Also, if at any point the block needs 370 # to consume all the remaining responses, this can be done using #each or 371 # #collect. Calling #each or #collect should only be done if 372 # the_call#writes_done has been called, otherwise the block will loop 373 # forever. 374 # 375 # == Flow Control == 376 # This is a blocking call. 377 # 378 # * the call completes when the next call to provided block returns 379 # false 380 # 381 # * the execution block parameters are two objects for sending and 382 # receiving responses, each of which blocks waiting for flow control. 383 # E.g, calles to bidi_call#remote_send will wait until flow control 384 # allows another write before returning; and obviously calls to 385 # responses#next block until the next response is available. 386 # 387 # == Termination == 388 # 389 # As well as sending and receiving messages, the block passed to the 390 # function is also responsible for: 391 # 392 # * calling bidi_call#writes_done to indicate no further reqs will be 393 # sent. 394 # 395 # * returning false if once the bidi stream is functionally completed. 396 # 397 # Note that response#next will indicate that there are no further 398 # responses by throwing StopIteration, but can only happen either 399 # if bidi_call#writes_done is called. 400 # 401 # To properly terminate the RPC, the responses should be completely iterated 402 # through; one way to do this is to loop on responses#next until no further 403 # responses are available. 404 # 405 # == Errors == 406 # An RuntimeError is raised if 407 # 408 # * the server responds with a non-OK status when any response is 409 # * retrieved 410 # 411 # * the deadline is exceeded 412 # 413 # 414 # == Return Value == 415 # 416 # if the return_op is false, the return value is an Enumerator of the 417 # results, unless a block is provided, in which case the block is 418 # executed with each response. 419 # 420 # if return_op is true, the function returns an Operation whose #execute 421 # method runs the Bidi call. Again, Operation#execute either calls a 422 # given block with each response or returns an Enumerator of the 423 # responses. 424 # 425 # @param method [String] the RPC method to call on the GRPC server 426 # @param requests [Object] an Enumerable of requests to send 427 # @param marshal [Function] f(obj)->string that marshals requests 428 # @param unmarshal [Function] f(string)->obj that unmarshals responses 429 # @param deadline [Time] (optional) the time the request should complete 430 # @param return_op [true|false] return an Operation if true 431 # @param parent [Core::Call] a prior call whose reserved metadata 432 # will be propagated by this one. 433 # @param credentials [Core::CallCredentials] credentials to use when making 434 # the call 435 # @param metadata [Hash] metadata to be sent to the server 436 # @param blk [Block] when provided, is executed for each response 437 # @return [Enumerator|nil|Operation] as discussed above 438 def bidi_streamer(method, requests, marshal, unmarshal, 439 deadline: nil, 440 return_op: false, 441 parent: nil, 442 credentials: nil, 443 metadata: {}, 444 &blk) 445 c = new_active_call(method, marshal, unmarshal, 446 deadline: deadline, 447 parent: parent, 448 credentials: credentials) 449 interception_context = @interceptors.build_context 450 intercept_args = { 451 method: method, 452 requests: requests, 453 call: c.interceptable, 454 metadata: metadata 455 } 456 if return_op 457 # return the operation view of the active_call; define #execute 458 # as a new method for this instance that invokes #bidi_streamer 459 c.merge_metadata_to_send(metadata) 460 op = c.operation 461 op.define_singleton_method(:execute) do 462 interception_context.intercept!(:bidi_streamer, intercept_args) do 463 c.bidi_streamer(requests, &blk) 464 end 465 end 466 op 467 else 468 interception_context.intercept!(:bidi_streamer, intercept_args) do 469 c.bidi_streamer(requests, metadata: metadata, &blk) 470 end 471 end 472 end 473 474 private 475 476 # Creates a new active stub 477 # 478 # @param method [string] the method being called. 479 # @param marshal [Function] f(obj)->string that marshals requests 480 # @param unmarshal [Function] f(string)->obj that unmarshals responses 481 # @param parent [Grpc::Call] a parent call, available when calls are 482 # made from server 483 # @param credentials [Core::CallCredentials] credentials to use when making 484 # the call 485 def new_active_call(method, marshal, unmarshal, 486 deadline: nil, 487 parent: nil, 488 credentials: nil) 489 deadline = from_relative_time(@timeout) if deadline.nil? 490 # Provide each new client call with its own completion queue 491 call = @ch.create_call(parent, # parent call 492 @propagate_mask, # propagation options 493 method, 494 nil, # host use nil, 495 deadline) 496 call.set_credentials! credentials unless credentials.nil? 497 ActiveCall.new(call, marshal, unmarshal, deadline, 498 started: false) 499 end 500 end 501end 502