1 // Copyright 2021 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 15 package dev.pigweed.pw_rpc; 16 17 import com.google.protobuf.MessageLite; 18 import dev.pigweed.pw_rpc.FutureCall.StreamResponseFuture; 19 import dev.pigweed.pw_rpc.FutureCall.UnaryResponseFuture; 20 import java.util.function.BiFunction; 21 import java.util.function.Consumer; 22 import javax.annotation.Nullable; 23 24 /** 25 * Represents a method ready to be invoked on a particular RPC channel. 26 * 27 * Invoking an RPC with a method client may throw exceptions: 28 * 29 * TODO: b/301644223 - This class should be split into four types -- one for each method type. The 30 * call type checks should be done when the object is created. Also, the client should be typed 31 * on the request/response. 32 */ 33 public class MethodClient { 34 private final Client client; 35 private final int channelId; 36 private final Method method; 37 private final StreamObserver<? extends MessageLite> defaultObserver; 38 MethodClient( Client client, int channelId, Method method, StreamObserver<MessageLite> defaultObserver)39 MethodClient( 40 Client client, int channelId, Method method, StreamObserver<MessageLite> defaultObserver) { 41 this.client = client; 42 this.channelId = channelId; 43 this.method = method; 44 this.defaultObserver = defaultObserver; 45 } 46 method()47 public final Method method() { 48 return method; 49 } 50 51 /** 52 * Invokes a unary RPC. Uses the default StreamObserver for RPC events. 53 * 54 * @throws InvalidRpcChannelException the client has no channel with this ID 55 * @throws InvalidRpcServiceException if the service was removed from the client 56 * @throws InvalidRpcServiceMethodException if the method was removed or changed since this 57 * MethodClient was created 58 */ invokeUnary(MessageLite request)59 public Call invokeUnary(MessageLite request) throws ChannelOutputException { 60 return invokeUnary(request, defaultObserver()); 61 } 62 63 /** Invokes a unary RPC. Uses the provided StreamObserver for RPC events. */ invokeUnary(MessageLite request, StreamObserver<? extends MessageLite> observer)64 public Call invokeUnary(MessageLite request, StreamObserver<? extends MessageLite> observer) 65 throws ChannelOutputException { 66 checkCallType(Method.Type.UNARY); 67 return client.invokeRpc(channelId, method, StreamObserverCall.getFactory(observer), request); 68 } 69 70 /** Invokes a unary RPC with a future that collects the response. */ invokeUnaryFuture( MessageLite request)71 public <ResponseT extends MessageLite> Call.UnaryFuture<ResponseT> invokeUnaryFuture( 72 MessageLite request) { 73 checkCallType(Method.Type.UNARY); 74 return invokeFuture(UnaryResponseFuture::new, request); 75 } 76 77 /** 78 * Creates a call object for a unary RPC without starting the RPC on the server. This can be used 79 * to start listening to responses to an RPC before the RPC server is available. 80 * 81 * <p>The RPC remains open until it is completed by the server with a response or error packet or 82 * cancelled. 83 */ openUnary(StreamObserver<? extends MessageLite> observer)84 public Call openUnary(StreamObserver<? extends MessageLite> observer) { 85 checkCallType(Method.Type.UNARY); 86 return client.openRpc(channelId, method, StreamObserverCall.getFactory(observer)); 87 } 88 89 /** Invokes a server streaming RPC. Uses the default StreamObserver for RPC events. */ invokeServerStreaming(MessageLite request)90 public Call invokeServerStreaming(MessageLite request) throws ChannelOutputException { 91 return invokeServerStreaming(request, defaultObserver()); 92 } 93 94 /** Invokes a server streaming RPC. Uses the provided StreamObserver for RPC events. */ invokeServerStreaming(MessageLite request, StreamObserver<? extends MessageLite> observer)95 public Call invokeServerStreaming(MessageLite request, 96 StreamObserver<? extends MessageLite> observer) throws ChannelOutputException { 97 checkCallType(Method.Type.SERVER_STREAMING); 98 return client.invokeRpc(channelId, method, StreamObserverCall.getFactory(observer), request); 99 } 100 101 /** Invokes a server streaming RPC with a future that collects the responses. */ invokeServerStreamingFuture( MessageLite request, Consumer<? extends MessageLite> onNext)102 public Call.ServerStreamingFuture invokeServerStreamingFuture( 103 MessageLite request, Consumer<? extends MessageLite> onNext) { 104 checkCallType(Method.Type.SERVER_STREAMING); 105 return invokeFuture(StreamResponseFuture.getFactory(onNext), request); 106 } 107 108 /** 109 * Creates a call object for a server streaming RPC without starting the RPC on the server. This 110 * can be used to start listening to responses to an RPC before the RPC server is available. 111 * 112 * <p>The RPC remains open until it is completed by the server with a response or error packet or 113 * cancelled. 114 */ openServerStreaming(StreamObserver<? extends MessageLite> observer)115 public Call openServerStreaming(StreamObserver<? extends MessageLite> observer) { 116 checkCallType(Method.Type.SERVER_STREAMING); 117 return client.openRpc(channelId, method, StreamObserverCall.getFactory(observer)); 118 } 119 120 /** Invokes a client streaming RPC. Uses the default StreamObserver for RPC events. */ invokeClientStreaming()121 public <RequestT extends MessageLite> Call.ClientStreaming<RequestT> invokeClientStreaming() 122 throws ChannelOutputException { 123 return invokeClientStreaming(defaultObserver()); 124 } 125 126 /** Invokes a client streaming RPC. Uses the provided StreamObserver for RPC events. */ invokeClientStreaming( StreamObserver<? extends MessageLite> observer)127 public <RequestT extends MessageLite> Call.ClientStreaming<RequestT> invokeClientStreaming( 128 StreamObserver<? extends MessageLite> observer) throws ChannelOutputException { 129 checkCallType(Method.Type.CLIENT_STREAMING); 130 return client.invokeRpc(channelId, method, StreamObserverCall.getFactory(observer), null); 131 } 132 133 /** Invokes a client streaming RPC with a future that collects the response. */ 134 public <RequestT extends MessageLite> Call.ClientStreaming<RequestT> invokeClientStreamingFuture()135 invokeClientStreamingFuture() { 136 checkCallType(Method.Type.CLIENT_STREAMING); 137 return invokeFuture(UnaryResponseFuture::new, null); 138 } 139 140 /** 141 * Creates a call object for a client streaming RPC without starting the RPC on the server. This 142 * can be used to start listening to responses to an RPC before the RPC server is available. 143 * 144 * <p>The RPC remains open until it is completed by the server with a response or error packet or 145 * cancelled. 146 */ openClientStreaming( StreamObserver<? extends MessageLite> observer)147 public <RequestT extends MessageLite> Call.ClientStreaming<RequestT> openClientStreaming( 148 StreamObserver<? extends MessageLite> observer) { 149 checkCallType(Method.Type.CLIENT_STREAMING); 150 return client.openRpc(channelId, method, StreamObserverCall.getFactory(observer)); 151 } 152 153 /** Invokes a bidirectional streaming RPC. Uses the default StreamObserver for RPC events. */ 154 public <RequestT extends MessageLite> Call.ClientStreaming<RequestT> invokeBidirectionalStreaming()155 invokeBidirectionalStreaming() throws ChannelOutputException { 156 return invokeBidirectionalStreaming(defaultObserver()); 157 } 158 159 /** Invokes a bidirectional streaming RPC. Uses the provided StreamObserver for RPC events. */ invokeBidirectionalStreaming( StreamObserver<? extends MessageLite> observer)160 public <RequestT extends MessageLite> Call.ClientStreaming<RequestT> invokeBidirectionalStreaming( 161 StreamObserver<? extends MessageLite> observer) throws ChannelOutputException { 162 checkCallType(Method.Type.BIDIRECTIONAL_STREAMING); 163 return client.invokeRpc(channelId, method, StreamObserverCall.getFactory(observer), null); 164 } 165 166 /** Invokes a bidirectional streaming RPC with a future that finishes when the RPC finishes. */ 167 public <RequestT extends MessageLite, ResponseT extends MessageLite> invokeBidirectionalStreamingFuture( Consumer<ResponseT> onNext)168 Call.BidirectionalStreamingFuture<RequestT> invokeBidirectionalStreamingFuture( 169 Consumer<ResponseT> onNext) { 170 checkCallType(Method.Type.BIDIRECTIONAL_STREAMING); 171 return invokeFuture(StreamResponseFuture.getFactory(onNext), null); 172 } 173 174 /** 175 * Creates a call object for a bidirectional streaming RPC without starting the RPC on the server. 176 * This can be used to start listening to responses to an RPC before the RPC server is available. 177 * 178 * <p>The RPC remains open until it is completed by the server with a response or error packet or 179 * cancelled. 180 */ openBidirectionalStreaming( StreamObserver<? extends MessageLite> observer)181 public <RequestT extends MessageLite> Call.ClientStreaming<RequestT> openBidirectionalStreaming( 182 StreamObserver<? extends MessageLite> observer) { 183 checkCallType(Method.Type.BIDIRECTIONAL_STREAMING); 184 return client.openRpc(channelId, method, StreamObserverCall.getFactory(observer)); 185 } 186 187 @SuppressWarnings("unchecked") defaultObserver()188 private <ResponseT extends MessageLite> StreamObserver<ResponseT> defaultObserver() { 189 return (StreamObserver<ResponseT>) defaultObserver; 190 } 191 192 public <RequestT extends MessageLite, ResponseT extends MessageLite, CallT 193 extends FutureCall<RequestT, ResponseT, ?>> CallT invokeFuture(BiFunction<Endpoint, PendingRpc, CallT> createCall, @Nullable MessageLite request)194 invokeFuture(BiFunction<Endpoint, PendingRpc, CallT> createCall, @Nullable MessageLite request) { 195 try { 196 return client.invokeRpc(channelId, method, createCall, request); 197 } catch (ChannelOutputException e) { 198 throw new AssertionError("Starting a future-based RPC call should never throw", e); 199 } 200 } 201 checkCallType(Method.Type expected)202 private void checkCallType(Method.Type expected) { 203 if (!method.type().equals(expected)) { 204 throw new UnsupportedOperationException(String.format( 205 "%s is a %s method, but it was invoked as a %s method. RPCs must be invoked by the" 206 + " appropriate invoke function.", 207 method.fullName(), 208 method.type().sentenceName(), 209 expected.sentenceName())); 210 } 211 } 212 } 213