1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15require_relative '../grpc' 16 17# GRPC contains the General RPC module. 18module GRPC 19 # RpcDesc is a Descriptor of an RPC method. 20 class RpcDesc < Struct.new(:name, :input, :output, :marshal_method, 21 :unmarshal_method) 22 include Core::StatusCodes 23 24 # Used to wrap a message class to indicate that it needs to be streamed. 25 class Stream 26 attr_accessor :type 27 28 def initialize(type) 29 @type = type 30 end 31 end 32 33 # @return [Proc] { |instance| marshalled(instance) } 34 def marshal_proc 35 proc { |o| o.class.send(marshal_method, o).to_s } 36 end 37 38 # @param [:input, :output] target determines whether to produce the an 39 # unmarshal Proc for the rpc input parameter or 40 # its output parameter 41 # 42 # @return [Proc] An unmarshal proc { |marshalled(instance)| instance } 43 def unmarshal_proc(target) 44 fail ArgumentError unless [:input, :output].include?(target) 45 unmarshal_class = send(target) 46 unmarshal_class = unmarshal_class.type if unmarshal_class.is_a? Stream 47 proc { |o| unmarshal_class.send(unmarshal_method, o) } 48 end 49 50 def handle_request_response(active_call, mth, inter_ctx) 51 req = active_call.read_unary_request 52 call = active_call.single_req_view 53 54 inter_ctx.intercept!( 55 :request_response, 56 method: mth, 57 call: call, 58 request: req 59 ) do 60 resp = mth.call(req, call) 61 active_call.server_unary_response( 62 resp, 63 trailing_metadata: active_call.output_metadata 64 ) 65 end 66 end 67 68 def handle_client_streamer(active_call, mth, inter_ctx) 69 call = active_call.multi_req_view 70 71 inter_ctx.intercept!( 72 :client_streamer, 73 method: mth, 74 call: call 75 ) do 76 resp = mth.call(call) 77 active_call.server_unary_response( 78 resp, 79 trailing_metadata: active_call.output_metadata 80 ) 81 end 82 end 83 84 def handle_server_streamer(active_call, mth, inter_ctx) 85 req = active_call.read_unary_request 86 call = active_call.single_req_view 87 88 inter_ctx.intercept!( 89 :server_streamer, 90 method: mth, 91 call: call, 92 request: req 93 ) do 94 replies = mth.call(req, call) 95 replies.each { |r| active_call.remote_send(r) } 96 send_status(active_call, OK, 'OK', active_call.output_metadata) 97 end 98 end 99 100 ## 101 # @param [GRPC::ActiveCall] active_call 102 # @param [Method] mth 103 # @param [Array<GRPC::InterceptionContext>] inter_ctx 104 # 105 def handle_bidi_streamer(active_call, mth, inter_ctx) 106 active_call.run_server_bidi(mth, inter_ctx) 107 send_status(active_call, OK, 'OK', active_call.output_metadata) 108 end 109 110 ## 111 # @param [GRPC::ActiveCall] active_call The current active call object 112 # for the request 113 # @param [Method] mth The current RPC method being called 114 # @param [GRPC::InterceptionContext] inter_ctx The interception context 115 # being executed 116 # 117 def run_server_method(active_call, mth, inter_ctx = InterceptionContext.new) 118 # While a server method is running, it might be cancelled, its deadline 119 # might be reached, the handler could throw an unknown error, or a 120 # well-behaved handler could throw a StatusError. 121 if request_response? 122 handle_request_response(active_call, mth, inter_ctx) 123 elsif client_streamer? 124 handle_client_streamer(active_call, mth, inter_ctx) 125 elsif server_streamer? 126 handle_server_streamer(active_call, mth, inter_ctx) 127 else # is a bidi_stream 128 handle_bidi_streamer(active_call, mth, inter_ctx) 129 end 130 rescue BadStatus => e 131 # this is raised by handlers that want GRPC to send an application error 132 # code and detail message and some additional app-specific metadata. 133 GRPC.logger.debug("app err:#{active_call}, status:#{e.code}:#{e.details}") 134 send_status(active_call, e.code, e.details, e.metadata) 135 rescue Core::CallError => e 136 # This is raised by GRPC internals but should rarely, if ever happen. 137 # Log it, but don't notify the other endpoint.. 138 GRPC.logger.warn("failed call: #{active_call}\n#{e}") 139 rescue Core::OutOfTime 140 # This is raised when active_call#method.call exceeds the deadline 141 # event. Send a status of deadline exceeded 142 GRPC.logger.warn("late call: #{active_call}") 143 send_status(active_call, DEADLINE_EXCEEDED, 'late') 144 rescue StandardError, NotImplementedError => e 145 # This will usuaally be an unhandled error in the handling code. 146 # Send back a UNKNOWN status to the client 147 # 148 # Note: this intentionally does not map NotImplementedError to 149 # UNIMPLEMENTED because NotImplementedError is intended for low-level 150 # OS interaction (e.g. syscalls) not supported by the current OS. 151 GRPC.logger.warn("failed handler: #{active_call}; sending status:UNKNOWN") 152 GRPC.logger.warn(e) 153 send_status(active_call, UNKNOWN, "#{e.class}: #{e.message}") 154 end 155 156 def assert_arity_matches(mth) 157 # A bidi handler function can optionally be passed a second 158 # call object parameter for access to metadata, cancelling, etc. 159 if bidi_streamer? 160 if mth.arity != 2 && mth.arity != 1 161 fail arity_error(mth, 2, "should be #{mth.name}(req, call) or " \ 162 "#{mth.name}(req)") 163 end 164 elsif request_response? || server_streamer? 165 if mth.arity != 2 166 fail arity_error(mth, 2, "should be #{mth.name}(req, call)") 167 end 168 else 169 if mth.arity != 1 170 fail arity_error(mth, 1, "should be #{mth.name}(call)") 171 end 172 end 173 end 174 175 def request_response? 176 !input.is_a?(Stream) && !output.is_a?(Stream) 177 end 178 179 def client_streamer? 180 input.is_a?(Stream) && !output.is_a?(Stream) 181 end 182 183 def server_streamer? 184 !input.is_a?(Stream) && output.is_a?(Stream) 185 end 186 187 def bidi_streamer? 188 input.is_a?(Stream) && output.is_a?(Stream) 189 end 190 191 def arity_error(mth, want, msg) 192 "##{mth.name}: bad arg count; got:#{mth.arity}, want:#{want}, #{msg}" 193 end 194 195 def send_status(active_client, code, details, metadata = {}) 196 details = 'Not sure why' if details.nil? 197 GRPC.logger.debug("Sending status #{code}:#{details}") 198 active_client.send_status(code, details, code == OK, metadata: metadata) 199 rescue StandardError => e 200 GRPC.logger.warn("Could not send status #{code}:#{details}") 201 GRPC.logger.warn(e) 202 end 203 end 204end 205