1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15require_relative 'client_stub' 16require_relative 'rpc_desc' 17 18# GRPC contains the General RPC module. 19module GRPC 20 # Provides behaviour used to implement schema-derived service classes. 21 # 22 # Is intended to be used to support both client and server 23 # IDL-schema-derived servers. 24 module GenericService 25 # creates a new string that is the underscore separate version of s. 26 # 27 # E.g, 28 # PrintHTML -> print_html 29 # AMethod -> a_method 30 # AnRpc -> an_rpc 31 # 32 # @param s [String] the string to be converted. 33 def self.underscore(s) 34 s = +s # Avoid mutating the argument, as it might be frozen. 35 s.gsub!(/([A-Z]+)([A-Z][a-z])/, '\1_\2') 36 s.gsub!(/([a-z\d])([A-Z])/, '\1_\2') 37 s.tr!('-', '_') 38 s.downcase! 39 s 40 end 41 42 # Used to indicate that a name has already been specified 43 class DuplicateRpcName < StandardError 44 def initialize(name) 45 super("rpc (#{name}) is already defined") 46 end 47 end 48 49 # Provides a simple DSL to describe RPC services. 50 # 51 # E.g, a Maths service that uses the serializable messages DivArgs, 52 # DivReply and Num might define its endpoint uses the following way: 53 # 54 # rpc :div DivArgs, DivReply # single request, single response 55 # rpc :sum stream(Num), Num # streamed input, single response 56 # rpc :fib FibArgs, stream(Num) # single request, streamed response 57 # rpc :div_many stream(DivArgs), stream(DivReply) 58 # # streamed req and resp 59 # 60 # Each 'rpc' adds an RpcDesc to classes including this module, and 61 # #assert_rpc_descs_have_methods is used to ensure the including class 62 # provides methods with signatures that support all the descriptors. 63 module Dsl 64 # This configures the method names that the serializable message 65 # implementation uses to marshal and unmarshal messages. 66 # 67 # - unmarshal_class method must be a class method on the serializable 68 # message type that takes a string (byte stream) and produces and object 69 # 70 # - marshal_class_method is called on a serializable message instance 71 # and produces a serialized string. 72 # 73 # The Dsl verifies that the types in the descriptor have both the 74 # unmarshal and marshal methods. 75 attr_writer(:marshal_class_method, :unmarshal_class_method) 76 77 # This allows configuration of the service name. 78 attr_accessor(:service_name) 79 80 # Adds an RPC spec. 81 # 82 # Takes the RPC name and the classes representing the types to be 83 # serialized, and adds them to the including classes rpc_desc hash. 84 # 85 # input and output should both have the methods #marshal and #unmarshal 86 # that are responsible for writing and reading an object instance from a 87 # byte buffer respectively. 88 # 89 # @param name [String] the name of the rpc 90 # @param input [Object] the input parameter's class 91 # @param output [Object] the output parameter's class 92 def rpc(name, input, output) 93 fail(DuplicateRpcName, name) if rpc_descs.key? name 94 assert_can_marshal(input) 95 assert_can_marshal(output) 96 rpc_descs[name] = RpcDesc.new(name, input, output, 97 marshal_class_method, 98 unmarshal_class_method) 99 define_method(GenericService.underscore(name.to_s).to_sym) do |*| 100 fail GRPC::BadStatus.new_status_exception( 101 GRPC::Core::StatusCodes::UNIMPLEMENTED) 102 end 103 end 104 105 def inherited(subclass) 106 # Each subclass should have a distinct class variable with its own 107 # rpc_descs 108 subclass.rpc_descs.merge!(rpc_descs) 109 subclass.service_name = service_name 110 end 111 112 # the name of the instance method used to marshal events to a byte 113 # stream. 114 def marshal_class_method 115 @marshal_class_method ||= :marshal 116 end 117 118 # the name of the class method used to unmarshal from a byte stream. 119 def unmarshal_class_method 120 @unmarshal_class_method ||= :unmarshal 121 end 122 123 def assert_can_marshal(cls) 124 cls = cls.type if cls.is_a? RpcDesc::Stream 125 mth = unmarshal_class_method 126 unless cls.methods.include? mth 127 fail(ArgumentError, "#{cls} needs #{cls}.#{mth}") 128 end 129 mth = marshal_class_method 130 return if cls.methods.include? mth 131 fail(ArgumentError, "#{cls} needs #{cls}.#{mth}") 132 end 133 134 # @param cls [Class] the class of a serializable type 135 # @return cls wrapped in a RpcDesc::Stream 136 def stream(cls) 137 assert_can_marshal(cls) 138 RpcDesc::Stream.new(cls) 139 end 140 141 # the RpcDescs defined for this GenericService, keyed by name. 142 def rpc_descs 143 @rpc_descs ||= {} 144 end 145 146 # Creates a rpc client class with methods for accessing the methods 147 # currently in rpc_descs. 148 def rpc_stub_class 149 descs = rpc_descs 150 route_prefix = service_name 151 Class.new(ClientStub) do 152 # @param host [String] the host the stub connects to 153 # @param creds [Core::ChannelCredentials|Symbol] The channel 154 # credentials to use, or :this_channel_is_insecure otherwise 155 # @param kw [KeywordArgs] the channel arguments, plus any optional 156 # args for configuring the client's channel 157 def initialize(host, creds, **kw) 158 super(host, creds, **kw) 159 end 160 161 # Used define_method to add a method for each rpc_desc. Each method 162 # calls the base class method for the given descriptor. 163 descs.each_pair do |name, desc| 164 mth_name = GenericService.underscore(name.to_s).to_sym 165 marshal = desc.marshal_proc 166 unmarshal = desc.unmarshal_proc(:output) 167 route = "/#{route_prefix}/#{name}" 168 if desc.request_response? 169 define_method(mth_name) do |req, metadata = {}| 170 GRPC.logger.debug("calling #{@host}:#{route}") 171 request_response(route, req, marshal, unmarshal, **metadata) 172 end 173 elsif desc.client_streamer? 174 define_method(mth_name) do |reqs, metadata = {}| 175 GRPC.logger.debug("calling #{@host}:#{route}") 176 client_streamer(route, reqs, marshal, unmarshal, **metadata) 177 end 178 elsif desc.server_streamer? 179 define_method(mth_name) do |req, metadata = {}, &blk| 180 GRPC.logger.debug("calling #{@host}:#{route}") 181 server_streamer(route, req, marshal, unmarshal, **metadata, &blk) 182 end 183 else # is a bidi_stream 184 define_method(mth_name) do |reqs, metadata = {}, &blk| 185 GRPC.logger.debug("calling #{@host}:#{route}") 186 bidi_streamer(route, reqs, marshal, unmarshal, **metadata, &blk) 187 end 188 end 189 end 190 end 191 end 192 end 193 194 def self.included(o) 195 o.extend(Dsl) 196 # Update to the use the service name including module. Provide a default 197 # that can be nil e.g. when modules are declared dynamically. 198 return unless o.service_name.nil? 199 if o.name.nil? 200 o.service_name = 'GenericService' 201 else 202 modules = o.name.split('::') 203 if modules.length > 2 204 o.service_name = modules[modules.length - 2] 205 else 206 o.service_name = modules.first 207 end 208 end 209 end 210 end 211end 212