1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15require_relative 'client_stub' 16require_relative 'rpc_desc' 17 18# GRPC contains the General RPC module. 19module GRPC 20 # Provides behaviour used to implement schema-derived service classes. 21 # 22 # Is intended to be used to support both client and server 23 # IDL-schema-derived servers. 24 module GenericService 25 # creates a new string that is the underscore separate version of s. 26 # 27 # E.g, 28 # PrintHTML -> print_html 29 # AMethod -> a_method 30 # AnRpc -> an_rpc 31 # 32 # @param s [String] the string to be converted. 33 def self.underscore(s) 34 s.gsub!(/([A-Z]+)([A-Z][a-z])/, '\1_\2') 35 s.gsub!(/([a-z\d])([A-Z])/, '\1_\2') 36 s.tr!('-', '_') 37 s.downcase! 38 s 39 end 40 41 # Used to indicate that a name has already been specified 42 class DuplicateRpcName < StandardError 43 def initialize(name) 44 super("rpc (#{name}) is already defined") 45 end 46 end 47 48 # Provides a simple DSL to describe RPC services. 49 # 50 # E.g, a Maths service that uses the serializable messages DivArgs, 51 # DivReply and Num might define its endpoint uses the following way: 52 # 53 # rpc :div DivArgs, DivReply # single request, single response 54 # rpc :sum stream(Num), Num # streamed input, single response 55 # rpc :fib FibArgs, stream(Num) # single request, streamed response 56 # rpc :div_many stream(DivArgs), stream(DivReply) 57 # # streamed req and resp 58 # 59 # Each 'rpc' adds an RpcDesc to classes including this module, and 60 # #assert_rpc_descs_have_methods is used to ensure the including class 61 # provides methods with signatures that support all the descriptors. 62 module Dsl 63 # This configures the method names that the serializable message 64 # implementation uses to marshal and unmarshal messages. 65 # 66 # - unmarshal_class method must be a class method on the serializable 67 # message type that takes a string (byte stream) and produces and object 68 # 69 # - marshal_class_method is called on a serializable message instance 70 # and produces a serialized string. 71 # 72 # The Dsl verifies that the types in the descriptor have both the 73 # unmarshal and marshal methods. 74 attr_writer(:marshal_class_method, :unmarshal_class_method) 75 76 # This allows configuration of the service name. 77 attr_accessor(:service_name) 78 79 # Adds an RPC spec. 80 # 81 # Takes the RPC name and the classes representing the types to be 82 # serialized, and adds them to the including classes rpc_desc hash. 83 # 84 # input and output should both have the methods #marshal and #unmarshal 85 # that are responsible for writing and reading an object instance from a 86 # byte buffer respectively. 87 # 88 # @param name [String] the name of the rpc 89 # @param input [Object] the input parameter's class 90 # @param output [Object] the output parameter's class 91 def rpc(name, input, output) 92 fail(DuplicateRpcName, name) if rpc_descs.key? name 93 assert_can_marshal(input) 94 assert_can_marshal(output) 95 rpc_descs[name] = RpcDesc.new(name, input, output, 96 marshal_class_method, 97 unmarshal_class_method) 98 define_method(GenericService.underscore(name.to_s).to_sym) do |_, _| 99 fail GRPC::BadStatus.new_status_exception( 100 GRPC::Core::StatusCodes::UNIMPLEMENTED) 101 end 102 end 103 104 def inherited(subclass) 105 # Each subclass should have a distinct class variable with its own 106 # rpc_descs 107 subclass.rpc_descs.merge!(rpc_descs) 108 subclass.service_name = service_name 109 end 110 111 # the name of the instance method used to marshal events to a byte 112 # stream. 113 def marshal_class_method 114 @marshal_class_method ||= :marshal 115 end 116 117 # the name of the class method used to unmarshal from a byte stream. 118 def unmarshal_class_method 119 @unmarshal_class_method ||= :unmarshal 120 end 121 122 def assert_can_marshal(cls) 123 cls = cls.type if cls.is_a? RpcDesc::Stream 124 mth = unmarshal_class_method 125 unless cls.methods.include? mth 126 fail(ArgumentError, "#{cls} needs #{cls}.#{mth}") 127 end 128 mth = marshal_class_method 129 return if cls.methods.include? mth 130 fail(ArgumentError, "#{cls} needs #{cls}.#{mth}") 131 end 132 133 # @param cls [Class] the class of a serializable type 134 # @return cls wrapped in a RpcDesc::Stream 135 def stream(cls) 136 assert_can_marshal(cls) 137 RpcDesc::Stream.new(cls) 138 end 139 140 # the RpcDescs defined for this GenericService, keyed by name. 141 def rpc_descs 142 @rpc_descs ||= {} 143 end 144 145 # Creates a rpc client class with methods for accessing the methods 146 # currently in rpc_descs. 147 def rpc_stub_class 148 descs = rpc_descs 149 route_prefix = service_name 150 Class.new(ClientStub) do 151 # @param host [String] the host the stub connects to 152 # @param creds [Core::ChannelCredentials|Symbol] The channel 153 # credentials to use, or :this_channel_is_insecure otherwise 154 # @param kw [KeywordArgs] the channel arguments, plus any optional 155 # args for configuring the client's channel 156 def initialize(host, creds, **kw) 157 super(host, creds, **kw) 158 end 159 160 # Used define_method to add a method for each rpc_desc. Each method 161 # calls the base class method for the given descriptor. 162 descs.each_pair do |name, desc| 163 mth_name = GenericService.underscore(name.to_s).to_sym 164 marshal = desc.marshal_proc 165 unmarshal = desc.unmarshal_proc(:output) 166 route = "/#{route_prefix}/#{name}" 167 if desc.request_response? 168 define_method(mth_name) do |req, metadata = {}| 169 GRPC.logger.debug("calling #{@host}:#{route}") 170 request_response(route, req, marshal, unmarshal, metadata) 171 end 172 elsif desc.client_streamer? 173 define_method(mth_name) do |reqs, metadata = {}| 174 GRPC.logger.debug("calling #{@host}:#{route}") 175 client_streamer(route, reqs, marshal, unmarshal, metadata) 176 end 177 elsif desc.server_streamer? 178 define_method(mth_name) do |req, metadata = {}, &blk| 179 GRPC.logger.debug("calling #{@host}:#{route}") 180 server_streamer(route, req, marshal, unmarshal, metadata, &blk) 181 end 182 else # is a bidi_stream 183 define_method(mth_name) do |reqs, metadata = {}, &blk| 184 GRPC.logger.debug("calling #{@host}:#{route}") 185 bidi_streamer(route, reqs, marshal, unmarshal, metadata, &blk) 186 end 187 end 188 end 189 end 190 end 191 end 192 193 def self.included(o) 194 o.extend(Dsl) 195 # Update to the use the service name including module. Provide a default 196 # that can be nil e.g. when modules are declared dynamically. 197 return unless o.service_name.nil? 198 if o.name.nil? 199 o.service_name = 'GenericService' 200 else 201 modules = o.name.split('::') 202 if modules.length > 2 203 o.service_name = modules[modules.length - 2] 204 else 205 o.service_name = modules.first 206 end 207 end 208 end 209 end 210end 211