• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15require_relative 'client_stub'
16require_relative 'rpc_desc'
17
18# GRPC contains the General RPC module.
19module GRPC
20  # Provides behaviour used to implement schema-derived service classes.
21  #
22  # Is intended to be used to support both client and server
23  # IDL-schema-derived servers.
24  module GenericService
25    # creates a new string that is the underscore separate version of s.
26    #
27    # E.g,
28    # PrintHTML -> print_html
29    # AMethod -> a_method
30    # AnRpc -> an_rpc
31    #
32    # @param s [String] the string to be converted.
33    def self.underscore(s)
34      s = +s # Avoid mutating the argument, as it might be frozen.
35      s.gsub!(/([A-Z]+)([A-Z][a-z])/, '\1_\2')
36      s.gsub!(/([a-z\d])([A-Z])/, '\1_\2')
37      s.tr!('-', '_')
38      s.downcase!
39      s
40    end
41
42    # Used to indicate that a name has already been specified
43    class DuplicateRpcName < StandardError
44      def initialize(name)
45        super("rpc (#{name}) is already defined")
46      end
47    end
48
49    # Provides a simple DSL to describe RPC services.
50    #
51    # E.g, a Maths service that uses the serializable messages DivArgs,
52    # DivReply and Num might define its endpoint uses the following way:
53    #
54    # rpc :div DivArgs, DivReply    # single request, single response
55    # rpc :sum stream(Num), Num     # streamed input, single response
56    # rpc :fib FibArgs, stream(Num) # single request, streamed response
57    # rpc :div_many stream(DivArgs), stream(DivReply)
58    #                               # streamed req and resp
59    #
60    # Each 'rpc' adds an RpcDesc to classes including this module, and
61    # #assert_rpc_descs_have_methods is used to ensure the including class
62    # provides methods with signatures that support all the descriptors.
63    module Dsl
64      # This configures the method names that the serializable message
65      # implementation uses to marshal and unmarshal messages.
66      #
67      # - unmarshal_class method must be a class method on the serializable
68      # message type that takes a string (byte stream) and produces and object
69      #
70      # - marshal_class_method is called on a serializable message instance
71      # and produces a serialized string.
72      #
73      # The Dsl verifies that the types in the descriptor have both the
74      # unmarshal and marshal methods.
75      attr_writer(:marshal_class_method, :unmarshal_class_method)
76
77      # This allows configuration of the service name.
78      attr_accessor(:service_name)
79
80      # Adds an RPC spec.
81      #
82      # Takes the RPC name and the classes representing the types to be
83      # serialized, and adds them to the including classes rpc_desc hash.
84      #
85      # input and output should both have the methods #marshal and #unmarshal
86      # that are responsible for writing and reading an object instance from a
87      # byte buffer respectively.
88      #
89      # @param name [String] the name of the rpc
90      # @param input [Object] the input parameter's class
91      # @param output [Object] the output parameter's class
92      def rpc(name, input, output)
93        fail(DuplicateRpcName, name) if rpc_descs.key? name
94        assert_can_marshal(input)
95        assert_can_marshal(output)
96        rpc_descs[name] = RpcDesc.new(name, input, output,
97                                      marshal_class_method,
98                                      unmarshal_class_method)
99        define_method(GenericService.underscore(name.to_s).to_sym) do |*|
100          fail GRPC::BadStatus.new_status_exception(
101            GRPC::Core::StatusCodes::UNIMPLEMENTED)
102        end
103      end
104
105      def inherited(subclass)
106        # Each subclass should have a distinct class variable with its own
107        # rpc_descs
108        subclass.rpc_descs.merge!(rpc_descs)
109        subclass.service_name = service_name
110      end
111
112      # the name of the instance method used to marshal events to a byte
113      # stream.
114      def marshal_class_method
115        @marshal_class_method ||= :marshal
116      end
117
118      # the name of the class method used to unmarshal from a byte stream.
119      def unmarshal_class_method
120        @unmarshal_class_method ||= :unmarshal
121      end
122
123      def assert_can_marshal(cls)
124        cls = cls.type if cls.is_a? RpcDesc::Stream
125        mth = unmarshal_class_method
126        unless cls.methods.include? mth
127          fail(ArgumentError, "#{cls} needs #{cls}.#{mth}")
128        end
129        mth = marshal_class_method
130        return if cls.methods.include? mth
131        fail(ArgumentError, "#{cls} needs #{cls}.#{mth}")
132      end
133
134      # @param cls [Class] the class of a serializable type
135      # @return cls wrapped in a RpcDesc::Stream
136      def stream(cls)
137        assert_can_marshal(cls)
138        RpcDesc::Stream.new(cls)
139      end
140
141      # the RpcDescs defined for this GenericService, keyed by name.
142      def rpc_descs
143        @rpc_descs ||= {}
144      end
145
146      # Creates a rpc client class with methods for accessing the methods
147      # currently in rpc_descs.
148      def rpc_stub_class
149        descs = rpc_descs
150        route_prefix = service_name
151        Class.new(ClientStub) do
152          # @param host [String] the host the stub connects to
153          # @param creds [Core::ChannelCredentials|Symbol] The channel
154          #     credentials to use, or :this_channel_is_insecure otherwise
155          # @param kw [KeywordArgs] the channel arguments, plus any optional
156          #                         args for configuring the client's channel
157          def initialize(host, creds, **kw)
158            super(host, creds, **kw)
159          end
160
161          # Used define_method to add a method for each rpc_desc.  Each method
162          # calls the base class method for the given descriptor.
163          descs.each_pair do |name, desc|
164            mth_name = GenericService.underscore(name.to_s).to_sym
165            marshal = desc.marshal_proc
166            unmarshal = desc.unmarshal_proc(:output)
167            route = "/#{route_prefix}/#{name}"
168            if desc.request_response?
169              define_method(mth_name) do |req, metadata = {}|
170                GRPC.logger.debug("calling #{@host}:#{route}")
171                request_response(route, req, marshal, unmarshal, **metadata)
172              end
173            elsif desc.client_streamer?
174              define_method(mth_name) do |reqs, metadata = {}|
175                GRPC.logger.debug("calling #{@host}:#{route}")
176                client_streamer(route, reqs, marshal, unmarshal, **metadata)
177              end
178            elsif desc.server_streamer?
179              define_method(mth_name) do |req, metadata = {}, &blk|
180                GRPC.logger.debug("calling #{@host}:#{route}")
181                server_streamer(route, req, marshal, unmarshal, **metadata, &blk)
182              end
183            else  # is a bidi_stream
184              define_method(mth_name) do |reqs, metadata = {}, &blk|
185                GRPC.logger.debug("calling #{@host}:#{route}")
186                bidi_streamer(route, reqs, marshal, unmarshal, **metadata, &blk)
187              end
188            end
189          end
190        end
191      end
192    end
193
194    def self.included(o)
195      o.extend(Dsl)
196      # Update to the use the service name including module. Provide a default
197      # that can be nil e.g. when modules are declared dynamically.
198      return unless o.service_name.nil?
199      if o.name.nil?
200        o.service_name = 'GenericService'
201      else
202        modules = o.name.split('::')
203        if modules.length > 2
204          o.service_name = modules[modules.length - 2]
205        else
206          o.service_name = modules.first
207        end
208      end
209    end
210  end
211end
212