• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 package dev.pigweed.pw_rpc;
16 
17 import com.google.protobuf.MessageLite;
18 import dev.pigweed.pw_rpc.FutureCall.StreamResponseFuture;
19 import dev.pigweed.pw_rpc.FutureCall.UnaryResponseFuture;
20 import java.util.function.BiFunction;
21 import java.util.function.Consumer;
22 import javax.annotation.Nullable;
23 
24 /**
25  * Represents a method ready to be invoked on a particular RPC channel.
26  *
27  * Invoking an RPC with a method client may throw exceptions:
28  *
29  * TODO: b/301644223 - This class should be split into four types -- one for each method type. The
30  *     call type checks should be done when the object is created. Also, the client should be typed
31  *     on the request/response.
32  */
33 public class MethodClient {
34   private final Client client;
35   private final int channelId;
36   private final Method method;
37   private final StreamObserver<? extends MessageLite> defaultObserver;
38 
MethodClient( Client client, int channelId, Method method, StreamObserver<MessageLite> defaultObserver)39   MethodClient(
40       Client client, int channelId, Method method, StreamObserver<MessageLite> defaultObserver) {
41     this.client = client;
42     this.channelId = channelId;
43     this.method = method;
44     this.defaultObserver = defaultObserver;
45   }
46 
method()47   public final Method method() {
48     return method;
49   }
50 
51   /**
52    * Invokes a unary RPC. Uses the default StreamObserver for RPC events.
53    *
54    * @throws InvalidRpcChannelException the client has no channel with this ID
55    * @throws InvalidRpcServiceException if the service was removed from the client
56    * @throws InvalidRpcServiceMethodException if the method was removed or changed since this
57    *     MethodClient was created
58    */
invokeUnary(MessageLite request)59   public Call invokeUnary(MessageLite request) throws ChannelOutputException {
60     return invokeUnary(request, defaultObserver());
61   }
62 
63   /** Invokes a unary RPC. Uses the provided StreamObserver for RPC events. */
invokeUnary(MessageLite request, StreamObserver<? extends MessageLite> observer)64   public Call invokeUnary(MessageLite request, StreamObserver<? extends MessageLite> observer)
65       throws ChannelOutputException {
66     checkCallType(Method.Type.UNARY);
67     return client.invokeRpc(channelId, method, StreamObserverCall.getFactory(observer), request);
68   }
69 
70   /** Invokes a unary RPC with a future that collects the response. */
invokeUnaryFuture( MessageLite request)71   public <ResponseT extends MessageLite> Call.UnaryFuture<ResponseT> invokeUnaryFuture(
72       MessageLite request) {
73     checkCallType(Method.Type.UNARY);
74     return invokeFuture(UnaryResponseFuture::new, request);
75   }
76 
77   /**
78    * Creates a call object for a unary RPC without starting the RPC on the server. This can be used
79    * to start listening to responses to an RPC before the RPC server is available.
80    *
81    * <p>The RPC remains open until it is completed by the server with a response or error packet or
82    * cancelled.
83    */
openUnary(StreamObserver<? extends MessageLite> observer)84   public Call openUnary(StreamObserver<? extends MessageLite> observer) {
85     checkCallType(Method.Type.UNARY);
86     return client.openRpc(channelId, method, StreamObserverCall.getFactory(observer));
87   }
88 
89   /** Invokes a server streaming RPC. Uses the default StreamObserver for RPC events. */
invokeServerStreaming(MessageLite request)90   public Call invokeServerStreaming(MessageLite request) throws ChannelOutputException {
91     return invokeServerStreaming(request, defaultObserver());
92   }
93 
94   /** Invokes a server streaming RPC. Uses the provided StreamObserver for RPC events. */
invokeServerStreaming(MessageLite request, StreamObserver<? extends MessageLite> observer)95   public Call invokeServerStreaming(MessageLite request,
96       StreamObserver<? extends MessageLite> observer) throws ChannelOutputException {
97     checkCallType(Method.Type.SERVER_STREAMING);
98     return client.invokeRpc(channelId, method, StreamObserverCall.getFactory(observer), request);
99   }
100 
101   /** Invokes a server streaming RPC with a future that collects the responses. */
invokeServerStreamingFuture( MessageLite request, Consumer<? extends MessageLite> onNext)102   public Call.ServerStreamingFuture invokeServerStreamingFuture(
103       MessageLite request, Consumer<? extends MessageLite> onNext) {
104     checkCallType(Method.Type.SERVER_STREAMING);
105     return invokeFuture(StreamResponseFuture.getFactory(onNext), request);
106   }
107 
108   /**
109    * Creates a call object for a server streaming RPC without starting the RPC on the server. This
110    * can be used to start listening to responses to an RPC before the RPC server is available.
111    *
112    * <p>The RPC remains open until it is completed by the server with a response or error packet or
113    * cancelled.
114    */
openServerStreaming(StreamObserver<? extends MessageLite> observer)115   public Call openServerStreaming(StreamObserver<? extends MessageLite> observer) {
116     checkCallType(Method.Type.SERVER_STREAMING);
117     return client.openRpc(channelId, method, StreamObserverCall.getFactory(observer));
118   }
119 
120   /** Invokes a client streaming RPC. Uses the default StreamObserver for RPC events. */
invokeClientStreaming()121   public <RequestT extends MessageLite> Call.ClientStreaming<RequestT> invokeClientStreaming()
122       throws ChannelOutputException {
123     return invokeClientStreaming(defaultObserver());
124   }
125 
126   /** Invokes a client streaming RPC. Uses the provided StreamObserver for RPC events. */
invokeClientStreaming( StreamObserver<? extends MessageLite> observer)127   public <RequestT extends MessageLite> Call.ClientStreaming<RequestT> invokeClientStreaming(
128       StreamObserver<? extends MessageLite> observer) throws ChannelOutputException {
129     checkCallType(Method.Type.CLIENT_STREAMING);
130     return client.invokeRpc(channelId, method, StreamObserverCall.getFactory(observer), null);
131   }
132 
133   /** Invokes a client streaming RPC with a future that collects the response. */
134   public <RequestT extends MessageLite> Call.ClientStreaming<RequestT>
invokeClientStreamingFuture()135   invokeClientStreamingFuture() {
136     checkCallType(Method.Type.CLIENT_STREAMING);
137     return invokeFuture(UnaryResponseFuture::new, null);
138   }
139 
140   /**
141    * Creates a call object for a client streaming RPC without starting the RPC on the server. This
142    * can be used to start listening to responses to an RPC before the RPC server is available.
143    *
144    * <p>The RPC remains open until it is completed by the server with a response or error packet or
145    * cancelled.
146    */
openClientStreaming( StreamObserver<? extends MessageLite> observer)147   public <RequestT extends MessageLite> Call.ClientStreaming<RequestT> openClientStreaming(
148       StreamObserver<? extends MessageLite> observer) {
149     checkCallType(Method.Type.CLIENT_STREAMING);
150     return client.openRpc(channelId, method, StreamObserverCall.getFactory(observer));
151   }
152 
153   /** Invokes a bidirectional streaming RPC. Uses the default StreamObserver for RPC events. */
154   public <RequestT extends MessageLite> Call.ClientStreaming<RequestT>
invokeBidirectionalStreaming()155   invokeBidirectionalStreaming() throws ChannelOutputException {
156     return invokeBidirectionalStreaming(defaultObserver());
157   }
158 
159   /** Invokes a bidirectional streaming RPC. Uses the provided StreamObserver for RPC events. */
invokeBidirectionalStreaming( StreamObserver<? extends MessageLite> observer)160   public <RequestT extends MessageLite> Call.ClientStreaming<RequestT> invokeBidirectionalStreaming(
161       StreamObserver<? extends MessageLite> observer) throws ChannelOutputException {
162     checkCallType(Method.Type.BIDIRECTIONAL_STREAMING);
163     return client.invokeRpc(channelId, method, StreamObserverCall.getFactory(observer), null);
164   }
165 
166   /** Invokes a bidirectional streaming RPC with a future that finishes when the RPC finishes. */
167   public <RequestT extends MessageLite, ResponseT extends MessageLite>
invokeBidirectionalStreamingFuture( Consumer<ResponseT> onNext)168       Call.BidirectionalStreamingFuture<RequestT> invokeBidirectionalStreamingFuture(
169           Consumer<ResponseT> onNext) {
170     checkCallType(Method.Type.BIDIRECTIONAL_STREAMING);
171     return invokeFuture(StreamResponseFuture.getFactory(onNext), null);
172   }
173 
174   /**
175    * Creates a call object for a bidirectional streaming RPC without starting the RPC on the server.
176    * This can be used to start listening to responses to an RPC before the RPC server is available.
177    *
178    * <p>The RPC remains open until it is completed by the server with a response or error packet or
179    * cancelled.
180    */
openBidirectionalStreaming( StreamObserver<? extends MessageLite> observer)181   public <RequestT extends MessageLite> Call.ClientStreaming<RequestT> openBidirectionalStreaming(
182       StreamObserver<? extends MessageLite> observer) {
183     checkCallType(Method.Type.BIDIRECTIONAL_STREAMING);
184     return client.openRpc(channelId, method, StreamObserverCall.getFactory(observer));
185   }
186 
187   @SuppressWarnings("unchecked")
defaultObserver()188   private <ResponseT extends MessageLite> StreamObserver<ResponseT> defaultObserver() {
189     return (StreamObserver<ResponseT>) defaultObserver;
190   }
191 
192   public <RequestT extends MessageLite, ResponseT extends MessageLite, CallT
193               extends FutureCall<RequestT, ResponseT, ?>> CallT
invokeFuture(BiFunction<Endpoint, PendingRpc, CallT> createCall, @Nullable MessageLite request)194   invokeFuture(BiFunction<Endpoint, PendingRpc, CallT> createCall, @Nullable MessageLite request) {
195     try {
196       return client.invokeRpc(channelId, method, createCall, request);
197     } catch (ChannelOutputException e) {
198       throw new AssertionError("Starting a future-based RPC call should never throw", e);
199     }
200   }
201 
checkCallType(Method.Type expected)202   private void checkCallType(Method.Type expected) {
203     if (!method.type().equals(expected)) {
204       throw new UnsupportedOperationException(String.format(
205           "%s is a %s method, but it was invoked as a %s method. RPCs must be invoked by the"
206               + " appropriate invoke function.",
207           method.fullName(),
208           method.type().sentenceName(),
209           expected.sentenceName()));
210     }
211   }
212 }
213